4 Commits
1.0.6 ... 1.0.9

Author SHA1 Message Date
wenzuhuai
59800438b3 feat: add client name option for user identification in system events 2026-02-03 11:12:47 +08:00
wenzuhuai
9161052033 feat: 增加重连过程的详细日志输出
- handleReconnect 中的日志从 debug 改为 info/warning
- 添加重连尝试次数和 reconnectAttempts 计数器的显示
- 添加等待时间日志
- 便于诊断重连失败的原因
2026-01-28 13:56:23 +08:00
wenzuhuai
8f67431623 fix: 修复 handleDisconnect 在 disconnect 失败时不触发 .disconnected 事件的问题 2026-01-28 13:38:51 +08:00
wenzuhuai
ec4618db87 fix: 重连成功后重置 outstandingPings 计数器,修复频繁重连问题 2026-01-23 09:58:40 +08:00
2 changed files with 32 additions and 8 deletions

View File

@@ -31,8 +31,16 @@ public class NatsClientOptions {
private var clientCertificate: URL? = nil private var clientCertificate: URL? = nil
private var clientKey: URL? = nil private var clientKey: URL? = nil
private var inboxPrefix: String = "_INBOX." private var inboxPrefix: String = "_INBOX."
private var clientName: String = ""
public init() {} public init() {}
/// Sets the client name sent to the server during CONNECT.
/// This name appears in system events and can be used to identify the client.
public func name(_ name: String) -> NatsClientOptions {
self.clientName = name
return self
}
/// Sets the prefix for inbox subjects used for request/reply. /// Sets the prefix for inbox subjects used for request/reply.
/// Defaults to "_INBOX." /// Defaults to "_INBOX."
@@ -195,7 +203,8 @@ public class NatsClientOptions {
clientCertificate: clientCertificate, clientCertificate: clientCertificate,
clientKey: clientKey, clientKey: clientKey,
rootCertificate: rootCertificate, rootCertificate: rootCertificate,
retryOnFailedConnect: initialReconnect retryOnFailedConnect: initialReconnect,
name: clientName
) )
return client return client
} }

View File

@@ -46,6 +46,7 @@ class ConnectionHandler: ChannelInboundHandler {
private var rootCertificate: URL? private var rootCertificate: URL?
private var clientCertificate: URL? private var clientCertificate: URL?
private var clientKey: URL? private var clientKey: URL?
private let clientName: String
typealias InboundIn = ByteBuffer typealias InboundIn = ByteBuffer
private let state = NIOLockedValueBox(NatsState.pending) private let state = NIOLockedValueBox(NatsState.pending)
@@ -86,7 +87,7 @@ class ConnectionHandler: ChannelInboundHandler {
retainServersOrder: Bool, retainServersOrder: Bool,
pingInterval: TimeInterval, auth: Auth?, requireTls: Bool, tlsFirst: Bool, pingInterval: TimeInterval, auth: Auth?, requireTls: Bool, tlsFirst: Bool,
clientCertificate: URL?, clientKey: URL?, clientCertificate: URL?, clientKey: URL?,
rootCertificate: URL?, retryOnFailedConnect: Bool rootCertificate: URL?, retryOnFailedConnect: Bool, name: String
) { ) {
self.urls = urls self.urls = urls
self.group = .singleton self.group = .singleton
@@ -102,6 +103,7 @@ class ConnectionHandler: ChannelInboundHandler {
self.clientKey = clientKey self.clientKey = clientKey
self.rootCertificate = rootCertificate self.rootCertificate = rootCertificate
self.retryOnFailedConnect = retryOnFailedConnect self.retryOnFailedConnect = retryOnFailedConnect
self.clientName = name
} }
func channelRead(context: ChannelHandlerContext, data: NIOAny) { func channelRead(context: ChannelHandlerContext, data: NIOAny) {
@@ -356,6 +358,10 @@ class ConnectionHandler: ChannelInboundHandler {
guard let channel = self.channel else { guard let channel = self.channel else {
throw NatsError.ClientError.internalError("empty channel") throw NatsError.ClientError.internalError("empty channel")
} }
// ping
self.outstandingPings.store(0, ordering: .relaxed)
// Schedule the task to send a PING periodically // Schedule the task to send a PING periodically
let pingInterval = TimeAmount.nanoseconds(Int64(self.pingInterval * 1_000_000_000)) let pingInterval = TimeAmount.nanoseconds(Int64(self.pingInterval * 1_000_000_000))
self.pingTask = channel.eventLoop.scheduleRepeatedTask( self.pingTask = channel.eventLoop.scheduleRepeatedTask(
@@ -481,7 +487,7 @@ class ConnectionHandler: ChannelInboundHandler {
private func sendClientConnectInit() async throws { private func sendClientConnectInit() async throws {
var initialConnect = ConnectInfo( var initialConnect = ConnectInfo(
verbose: false, pedantic: false, userJwt: nil, nkey: "", name: "", echo: true, verbose: false, pedantic: false, userJwt: nil, nkey: "", name: self.clientName, echo: true,
lang: self.lang, version: self.version, natsProtocol: .dynamic, tlsRequired: false, lang: self.lang, version: self.version, natsProtocol: .dynamic, tlsRequired: false,
user: self.auth?.user ?? "", pass: self.auth?.password ?? "", user: self.auth?.user ?? "", pass: self.auth?.password ?? "",
authToken: self.auth?.token ?? "", headers: true, noResponders: true) authToken: self.auth?.token ?? "", headers: true, noResponders: true)
@@ -985,11 +991,13 @@ class ConnectionHandler: ChannelInboundHandler {
promise.futureResult.whenComplete { result in promise.futureResult.whenComplete { result in
do { do {
try result.get() try result.get()
self.fire(.disconnected)
} catch { } catch {
// debug TLS // debug TLS
logger.debug("Connection closed with error (will reconnect): \(error)") logger.debug("Connection closed with error (will reconnect): \(error)")
} }
// .disconnected
//
self.fire(.disconnected)
// Only start reconnect after disconnect is complete // Only start reconnect after disconnect is complete
self.handleReconnect() self.handleReconnect()
} }
@@ -1005,21 +1013,28 @@ class ConnectionHandler: ChannelInboundHandler {
oldTask.cancel() oldTask.cancel()
} }
logger.info("🔄 Starting reconnection process...")
reconnectTask = Task { reconnectTask = Task {
var connected = false var connected = false
var attempt = 0
while !Task.isCancelled while !Task.isCancelled
&& (maxReconnects == nil || self.reconnectAttempts < maxReconnects!) && (maxReconnects == nil || self.reconnectAttempts < maxReconnects!)
{ {
attempt += 1
logger.info("🔄 Reconnect attempt \(attempt), total reconnectAttempts: \(self.reconnectAttempts)")
do { do {
try await self.connect() try await self.connect()
connected = true connected = true
logger.info("✅ Reconnection successful after \(attempt) attempts")
break // Successfully connected break // Successfully connected
} catch is CancellationError { } catch is CancellationError {
logger.debug("Reconnect task cancelled") logger.info("⚠️ Reconnect task cancelled")
return return
} catch { } catch {
logger.debug("Could not reconnect: \(error)") logger.warning("⚠️ Reconnect attempt \(attempt) failed: \(error)")
if !Task.isCancelled { if !Task.isCancelled {
logger.info("⏳ Waiting \(Double(self.reconnectWait) / 1_000_000_000)s before next attempt...")
try? await Task.sleep(nanoseconds: self.reconnectWait) try? await Task.sleep(nanoseconds: self.reconnectWait)
} }
} }
@@ -1027,13 +1042,13 @@ class ConnectionHandler: ChannelInboundHandler {
// Early return if cancelled // Early return if cancelled
if Task.isCancelled { if Task.isCancelled {
logger.debug("Reconnect task cancelled after connection attempts") logger.info("⚠️ Reconnect task cancelled after \(attempt) connection attempts")
return return
} }
// If we got here without connecting and weren't cancelled, we hit max reconnects // If we got here without connecting and weren't cancelled, we hit max reconnects
if !connected { if !connected {
logger.error("Could not reconnect; maxReconnects exceeded") logger.error("Could not reconnect; maxReconnects exceeded (\(self.maxReconnects ?? -1))")
try? await self.close() try? await self.close()
return return
} }