5 Commits
1.0.5 ... 1.0.9

Author SHA1 Message Date
wenzuhuai
59800438b3 feat: add client name option for user identification in system events 2026-02-03 11:12:47 +08:00
wenzuhuai
9161052033 feat: 增加重连过程的详细日志输出
- handleReconnect 中的日志从 debug 改为 info/warning
- 添加重连尝试次数和 reconnectAttempts 计数器的显示
- 添加等待时间日志
- 便于诊断重连失败的原因
2026-01-28 13:56:23 +08:00
wenzuhuai
8f67431623 fix: 修复 handleDisconnect 在 disconnect 失败时不触发 .disconnected 事件的问题 2026-01-28 13:38:51 +08:00
wenzuhuai
ec4618db87 fix: 重连成功后重置 outstandingPings 计数器,修复频繁重连问题 2026-01-23 09:58:40 +08:00
wenzuhuai
30fb5d8b27 fix: 将断网场景的错误日志降级为 debug 级别
- Error closing connection 降级为 debug(TLS 关闭错误是断网预期行为)
- Write failed: batchBuffer is nil 降级为 debug
- Write operation failed 降级为 debug
- 避免长时间断网恢复时日志泛滥
2026-01-23 09:34:33 +08:00
2 changed files with 38 additions and 11 deletions

View File

@@ -31,8 +31,16 @@ public class NatsClientOptions {
private var clientCertificate: URL? = nil
private var clientKey: URL? = nil
private var inboxPrefix: String = "_INBOX."
private var clientName: String = ""
public init() {}
/// Sets the client name sent to the server during CONNECT.
/// This name appears in system events and can be used to identify the client.
public func name(_ name: String) -> NatsClientOptions {
self.clientName = name
return self
}
/// Sets the prefix for inbox subjects used for request/reply.
/// Defaults to "_INBOX."
@@ -195,7 +203,8 @@ public class NatsClientOptions {
clientCertificate: clientCertificate,
clientKey: clientKey,
rootCertificate: rootCertificate,
retryOnFailedConnect: initialReconnect
retryOnFailedConnect: initialReconnect,
name: clientName
)
return client
}

View File

@@ -46,6 +46,7 @@ class ConnectionHandler: ChannelInboundHandler {
private var rootCertificate: URL?
private var clientCertificate: URL?
private var clientKey: URL?
private let clientName: String
typealias InboundIn = ByteBuffer
private let state = NIOLockedValueBox(NatsState.pending)
@@ -86,7 +87,7 @@ class ConnectionHandler: ChannelInboundHandler {
retainServersOrder: Bool,
pingInterval: TimeInterval, auth: Auth?, requireTls: Bool, tlsFirst: Bool,
clientCertificate: URL?, clientKey: URL?,
rootCertificate: URL?, retryOnFailedConnect: Bool
rootCertificate: URL?, retryOnFailedConnect: Bool, name: String
) {
self.urls = urls
self.group = .singleton
@@ -102,6 +103,7 @@ class ConnectionHandler: ChannelInboundHandler {
self.clientKey = clientKey
self.rootCertificate = rootCertificate
self.retryOnFailedConnect = retryOnFailedConnect
self.clientName = name
}
func channelRead(context: ChannelHandlerContext, data: NIOAny) {
@@ -356,6 +358,10 @@ class ConnectionHandler: ChannelInboundHandler {
guard let channel = self.channel else {
throw NatsError.ClientError.internalError("empty channel")
}
// ping
self.outstandingPings.store(0, ordering: .relaxed)
// Schedule the task to send a PING periodically
let pingInterval = TimeAmount.nanoseconds(Int64(self.pingInterval * 1_000_000_000))
self.pingTask = channel.eventLoop.scheduleRepeatedTask(
@@ -481,7 +487,7 @@ class ConnectionHandler: ChannelInboundHandler {
private func sendClientConnectInit() async throws {
var initialConnect = ConnectInfo(
verbose: false, pedantic: false, userJwt: nil, nkey: "", name: "", echo: true,
verbose: false, pedantic: false, userJwt: nil, nkey: "", name: self.clientName, echo: true,
lang: self.lang, version: self.version, natsProtocol: .dynamic, tlsRequired: false,
user: self.auth?.user ?? "", pass: self.auth?.password ?? "",
authToken: self.auth?.token ?? "", headers: true, noResponders: true)
@@ -985,10 +991,13 @@ class ConnectionHandler: ChannelInboundHandler {
promise.futureResult.whenComplete { result in
do {
try result.get()
self.fire(.disconnected)
} catch {
logger.error("Error closing connection: \(error)")
// debug TLS
logger.debug("Connection closed with error (will reconnect): \(error)")
}
// .disconnected
//
self.fire(.disconnected)
// Only start reconnect after disconnect is complete
self.handleReconnect()
}
@@ -1004,21 +1013,28 @@ class ConnectionHandler: ChannelInboundHandler {
oldTask.cancel()
}
logger.info("🔄 Starting reconnection process...")
reconnectTask = Task {
var connected = false
var attempt = 0
while !Task.isCancelled
&& (maxReconnects == nil || self.reconnectAttempts < maxReconnects!)
{
attempt += 1
logger.info("🔄 Reconnect attempt \(attempt), total reconnectAttempts: \(self.reconnectAttempts)")
do {
try await self.connect()
connected = true
logger.info("✅ Reconnection successful after \(attempt) attempts")
break // Successfully connected
} catch is CancellationError {
logger.debug("Reconnect task cancelled")
logger.info("⚠️ Reconnect task cancelled")
return
} catch {
logger.debug("Could not reconnect: \(error)")
logger.warning("⚠️ Reconnect attempt \(attempt) failed: \(error)")
if !Task.isCancelled {
logger.info("⏳ Waiting \(Double(self.reconnectWait) / 1_000_000_000)s before next attempt...")
try? await Task.sleep(nanoseconds: self.reconnectWait)
}
}
@@ -1026,13 +1042,13 @@ class ConnectionHandler: ChannelInboundHandler {
// Early return if cancelled
if Task.isCancelled {
logger.debug("Reconnect task cancelled after connection attempts")
logger.info("⚠️ Reconnect task cancelled after \(attempt) connection attempts")
return
}
// If we got here without connecting and weren't cancelled, we hit max reconnects
if !connected {
logger.error("Could not reconnect; maxReconnects exceeded")
logger.error("Could not reconnect; maxReconnects exceeded (\(self.maxReconnects ?? -1))")
try? await self.close()
return
}
@@ -1060,7 +1076,8 @@ class ConnectionHandler: ChannelInboundHandler {
// Trigger reconnect to recover
let currentState = state.withLockedValue { $0 }
if currentState == .connected {
logger.error("Write failed: batchBuffer is nil but state is connected, triggering reconnect")
// debug
logger.debug("Write failed: batchBuffer is nil, triggering reconnect")
handleDisconnect()
}
throw NatsError.ClientError.invalidConnection("not connected")
@@ -1071,7 +1088,8 @@ class ConnectionHandler: ChannelInboundHandler {
// Trigger reconnect on write failure - connection may be broken
let currentState = state.withLockedValue { $0 }
if currentState == .connected {
logger.error("Write operation failed, triggering reconnect: \(error)")
// debug
logger.debug("Write operation failed, triggering reconnect: \(error)")
handleDisconnect()
}
throw NatsError.ClientError.io(error)