Compare commits
4 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
59800438b3 | ||
|
|
9161052033 | ||
|
|
8f67431623 | ||
|
|
ec4618db87 |
@@ -31,8 +31,16 @@ public class NatsClientOptions {
|
|||||||
private var clientCertificate: URL? = nil
|
private var clientCertificate: URL? = nil
|
||||||
private var clientKey: URL? = nil
|
private var clientKey: URL? = nil
|
||||||
private var inboxPrefix: String = "_INBOX."
|
private var inboxPrefix: String = "_INBOX."
|
||||||
|
private var clientName: String = ""
|
||||||
|
|
||||||
public init() {}
|
public init() {}
|
||||||
|
|
||||||
|
/// Sets the client name sent to the server during CONNECT.
|
||||||
|
/// This name appears in system events and can be used to identify the client.
|
||||||
|
public func name(_ name: String) -> NatsClientOptions {
|
||||||
|
self.clientName = name
|
||||||
|
return self
|
||||||
|
}
|
||||||
|
|
||||||
/// Sets the prefix for inbox subjects used for request/reply.
|
/// Sets the prefix for inbox subjects used for request/reply.
|
||||||
/// Defaults to "_INBOX."
|
/// Defaults to "_INBOX."
|
||||||
@@ -195,7 +203,8 @@ public class NatsClientOptions {
|
|||||||
clientCertificate: clientCertificate,
|
clientCertificate: clientCertificate,
|
||||||
clientKey: clientKey,
|
clientKey: clientKey,
|
||||||
rootCertificate: rootCertificate,
|
rootCertificate: rootCertificate,
|
||||||
retryOnFailedConnect: initialReconnect
|
retryOnFailedConnect: initialReconnect,
|
||||||
|
name: clientName
|
||||||
)
|
)
|
||||||
return client
|
return client
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -46,6 +46,7 @@ class ConnectionHandler: ChannelInboundHandler {
|
|||||||
private var rootCertificate: URL?
|
private var rootCertificate: URL?
|
||||||
private var clientCertificate: URL?
|
private var clientCertificate: URL?
|
||||||
private var clientKey: URL?
|
private var clientKey: URL?
|
||||||
|
private let clientName: String
|
||||||
|
|
||||||
typealias InboundIn = ByteBuffer
|
typealias InboundIn = ByteBuffer
|
||||||
private let state = NIOLockedValueBox(NatsState.pending)
|
private let state = NIOLockedValueBox(NatsState.pending)
|
||||||
@@ -86,7 +87,7 @@ class ConnectionHandler: ChannelInboundHandler {
|
|||||||
retainServersOrder: Bool,
|
retainServersOrder: Bool,
|
||||||
pingInterval: TimeInterval, auth: Auth?, requireTls: Bool, tlsFirst: Bool,
|
pingInterval: TimeInterval, auth: Auth?, requireTls: Bool, tlsFirst: Bool,
|
||||||
clientCertificate: URL?, clientKey: URL?,
|
clientCertificate: URL?, clientKey: URL?,
|
||||||
rootCertificate: URL?, retryOnFailedConnect: Bool
|
rootCertificate: URL?, retryOnFailedConnect: Bool, name: String
|
||||||
) {
|
) {
|
||||||
self.urls = urls
|
self.urls = urls
|
||||||
self.group = .singleton
|
self.group = .singleton
|
||||||
@@ -102,6 +103,7 @@ class ConnectionHandler: ChannelInboundHandler {
|
|||||||
self.clientKey = clientKey
|
self.clientKey = clientKey
|
||||||
self.rootCertificate = rootCertificate
|
self.rootCertificate = rootCertificate
|
||||||
self.retryOnFailedConnect = retryOnFailedConnect
|
self.retryOnFailedConnect = retryOnFailedConnect
|
||||||
|
self.clientName = name
|
||||||
}
|
}
|
||||||
|
|
||||||
func channelRead(context: ChannelHandlerContext, data: NIOAny) {
|
func channelRead(context: ChannelHandlerContext, data: NIOAny) {
|
||||||
@@ -356,6 +358,10 @@ class ConnectionHandler: ChannelInboundHandler {
|
|||||||
guard let channel = self.channel else {
|
guard let channel = self.channel else {
|
||||||
throw NatsError.ClientError.internalError("empty channel")
|
throw NatsError.ClientError.internalError("empty channel")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 重连成功后重置 ping 计数器,避免累积的失败计数导致立即断开
|
||||||
|
self.outstandingPings.store(0, ordering: .relaxed)
|
||||||
|
|
||||||
// Schedule the task to send a PING periodically
|
// Schedule the task to send a PING periodically
|
||||||
let pingInterval = TimeAmount.nanoseconds(Int64(self.pingInterval * 1_000_000_000))
|
let pingInterval = TimeAmount.nanoseconds(Int64(self.pingInterval * 1_000_000_000))
|
||||||
self.pingTask = channel.eventLoop.scheduleRepeatedTask(
|
self.pingTask = channel.eventLoop.scheduleRepeatedTask(
|
||||||
@@ -481,7 +487,7 @@ class ConnectionHandler: ChannelInboundHandler {
|
|||||||
|
|
||||||
private func sendClientConnectInit() async throws {
|
private func sendClientConnectInit() async throws {
|
||||||
var initialConnect = ConnectInfo(
|
var initialConnect = ConnectInfo(
|
||||||
verbose: false, pedantic: false, userJwt: nil, nkey: "", name: "", echo: true,
|
verbose: false, pedantic: false, userJwt: nil, nkey: "", name: self.clientName, echo: true,
|
||||||
lang: self.lang, version: self.version, natsProtocol: .dynamic, tlsRequired: false,
|
lang: self.lang, version: self.version, natsProtocol: .dynamic, tlsRequired: false,
|
||||||
user: self.auth?.user ?? "", pass: self.auth?.password ?? "",
|
user: self.auth?.user ?? "", pass: self.auth?.password ?? "",
|
||||||
authToken: self.auth?.token ?? "", headers: true, noResponders: true)
|
authToken: self.auth?.token ?? "", headers: true, noResponders: true)
|
||||||
@@ -985,11 +991,13 @@ class ConnectionHandler: ChannelInboundHandler {
|
|||||||
promise.futureResult.whenComplete { result in
|
promise.futureResult.whenComplete { result in
|
||||||
do {
|
do {
|
||||||
try result.get()
|
try result.get()
|
||||||
self.fire(.disconnected)
|
|
||||||
} catch {
|
} catch {
|
||||||
// 降级为 debug:网络断开时 TLS 无法完成正常关闭握手是预期行为
|
// 降级为 debug:网络断开时 TLS 无法完成正常关闭握手是预期行为
|
||||||
logger.debug("Connection closed with error (will reconnect): \(error)")
|
logger.debug("Connection closed with error (will reconnect): \(error)")
|
||||||
}
|
}
|
||||||
|
// 无论成功还是失败,都要触发 .disconnected 事件
|
||||||
|
// 这样上层才能感知到连接已断开
|
||||||
|
self.fire(.disconnected)
|
||||||
// Only start reconnect after disconnect is complete
|
// Only start reconnect after disconnect is complete
|
||||||
self.handleReconnect()
|
self.handleReconnect()
|
||||||
}
|
}
|
||||||
@@ -1005,21 +1013,28 @@ class ConnectionHandler: ChannelInboundHandler {
|
|||||||
oldTask.cancel()
|
oldTask.cancel()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
logger.info("🔄 Starting reconnection process...")
|
||||||
|
|
||||||
reconnectTask = Task {
|
reconnectTask = Task {
|
||||||
var connected = false
|
var connected = false
|
||||||
|
var attempt = 0
|
||||||
while !Task.isCancelled
|
while !Task.isCancelled
|
||||||
&& (maxReconnects == nil || self.reconnectAttempts < maxReconnects!)
|
&& (maxReconnects == nil || self.reconnectAttempts < maxReconnects!)
|
||||||
{
|
{
|
||||||
|
attempt += 1
|
||||||
|
logger.info("🔄 Reconnect attempt \(attempt), total reconnectAttempts: \(self.reconnectAttempts)")
|
||||||
do {
|
do {
|
||||||
try await self.connect()
|
try await self.connect()
|
||||||
connected = true
|
connected = true
|
||||||
|
logger.info("✅ Reconnection successful after \(attempt) attempts")
|
||||||
break // Successfully connected
|
break // Successfully connected
|
||||||
} catch is CancellationError {
|
} catch is CancellationError {
|
||||||
logger.debug("Reconnect task cancelled")
|
logger.info("⚠️ Reconnect task cancelled")
|
||||||
return
|
return
|
||||||
} catch {
|
} catch {
|
||||||
logger.debug("Could not reconnect: \(error)")
|
logger.warning("⚠️ Reconnect attempt \(attempt) failed: \(error)")
|
||||||
if !Task.isCancelled {
|
if !Task.isCancelled {
|
||||||
|
logger.info("⏳ Waiting \(Double(self.reconnectWait) / 1_000_000_000)s before next attempt...")
|
||||||
try? await Task.sleep(nanoseconds: self.reconnectWait)
|
try? await Task.sleep(nanoseconds: self.reconnectWait)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -1027,13 +1042,13 @@ class ConnectionHandler: ChannelInboundHandler {
|
|||||||
|
|
||||||
// Early return if cancelled
|
// Early return if cancelled
|
||||||
if Task.isCancelled {
|
if Task.isCancelled {
|
||||||
logger.debug("Reconnect task cancelled after connection attempts")
|
logger.info("⚠️ Reconnect task cancelled after \(attempt) connection attempts")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// If we got here without connecting and weren't cancelled, we hit max reconnects
|
// If we got here without connecting and weren't cancelled, we hit max reconnects
|
||||||
if !connected {
|
if !connected {
|
||||||
logger.error("Could not reconnect; maxReconnects exceeded")
|
logger.error("❌ Could not reconnect; maxReconnects exceeded (\(self.maxReconnects ?? -1))")
|
||||||
try? await self.close()
|
try? await self.close()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user