@@ -46,6 +46,7 @@ class ConnectionHandler: ChannelInboundHandler {
private var rootCertificate : URL ?
private var rootCertificate : URL ?
private var clientCertificate : URL ?
private var clientCertificate : URL ?
private var clientKey : URL ?
private var clientKey : URL ?
private let clientName : String
typealias InboundIn = ByteBuffer
typealias InboundIn = ByteBuffer
private let state = NIOLockedValueBox ( NatsState . pending )
private let state = NIOLockedValueBox ( NatsState . pending )
@@ -86,7 +87,7 @@ class ConnectionHandler: ChannelInboundHandler {
retainServersOrder : Bool ,
retainServersOrder : Bool ,
pingInterval : TimeInterval , auth : Auth ? , requireTls : Bool , tlsFirst : Bool ,
pingInterval : TimeInterval , auth : Auth ? , requireTls : Bool , tlsFirst : Bool ,
clientCertificate : URL ? , clientKey : URL ? ,
clientCertificate : URL ? , clientKey : URL ? ,
rootCertificate : URL ? , retryOnFailedConnect : Bool
rootCertificate : URL ? , retryOnFailedConnect : Bool , name : String
) {
) {
self . urls = urls
self . urls = urls
self . group = . singleton
self . group = . singleton
@@ -102,6 +103,7 @@ class ConnectionHandler: ChannelInboundHandler {
self . clientKey = clientKey
self . clientKey = clientKey
self . rootCertificate = rootCertificate
self . rootCertificate = rootCertificate
self . retryOnFailedConnect = retryOnFailedConnect
self . retryOnFailedConnect = retryOnFailedConnect
self . clientName = name
}
}
func channelRead ( context : ChannelHandlerContext , data : NIOAny ) {
func channelRead ( context : ChannelHandlerContext , data : NIOAny ) {
@@ -356,6 +358,10 @@ class ConnectionHandler: ChannelInboundHandler {
guard let channel = self . channel else {
guard let channel = self . channel else {
throw NatsError . ClientError . internalError ( " empty channel " )
throw NatsError . ClientError . internalError ( " empty channel " )
}
}
// 重 连 成 功 后 重 置 p i n g 计 数 器 , 避 免 累 积 的 失 败 计 数 导 致 立 即 断 开
self . outstandingPings . store ( 0 , ordering : . relaxed )
// S c h e d u l e t h e t a s k t o s e n d a P I N G p e r i o d i c a l l y
// S c h e d u l e t h e t a s k t o s e n d a P I N G p e r i o d i c a l l y
let pingInterval = TimeAmount . nanoseconds ( Int64 ( self . pingInterval * 1_000_000_000 ) )
let pingInterval = TimeAmount . nanoseconds ( Int64 ( self . pingInterval * 1_000_000_000 ) )
self . pingTask = channel . eventLoop . scheduleRepeatedTask (
self . pingTask = channel . eventLoop . scheduleRepeatedTask (
@@ -481,7 +487,7 @@ class ConnectionHandler: ChannelInboundHandler {
private func sendClientConnectInit ( ) async throws {
private func sendClientConnectInit ( ) async throws {
var initialConnect = ConnectInfo (
var initialConnect = ConnectInfo (
verbose : false , pedantic : false , userJwt : nil , nkey : " " , name : " " , echo : true ,
verbose : false , pedantic : false , userJwt : nil , nkey : " " , name : self . clientName , echo : true ,
lang : self . lang , version : self . version , natsProtocol : . dynamic , tlsRequired : false ,
lang : self . lang , version : self . version , natsProtocol : . dynamic , tlsRequired : false ,
user : self . auth ? . user ? ? " " , pass : self . auth ? . password ? ? " " ,
user : self . auth ? . user ? ? " " , pass : self . auth ? . password ? ? " " ,
authToken : self . auth ? . token ? ? " " , headers : true , noResponders : true )
authToken : self . auth ? . token ? ? " " , headers : true , noResponders : true )
@@ -921,14 +927,14 @@ class ConnectionHandler: ChannelInboundHandler {
if let natsErr = error as ? NatsErrorProtocol {
if let natsErr = error as ? NatsErrorProtocol {
self . fire ( . error ( natsErr ) )
self . fire ( . error ( natsErr ) )
} else {
} else {
logger . error ( " unexpected error: \( error ) " )
// 降 级 为 d e b u g 级 别 , 避 免 频 繁 输 出 错 误 日 志
// u n c l e a n S h u t d o w n 是 常 见 的 T L S 关 闭 情 况 , 不 需 要 作 为 错 误 处 理
logger . debug ( " Channel error (will reconnect if needed): \( error ) " )
}
}
// U n i f i e d h a n d l i n g : u s e h a n d l e D i s c o n n e c t f o r a l l n o n - c l o s e d / n o n - d i s c o n n e c t e d s t a t e s
// 注 意 : 不 在 这 里 调 用 h a n d l e D i s c o n n e c t
let currentState = state . withLockedValue { $0 }
// c o n t e x t . c l o s e ( ) 会 触 发 c h a n n e l I n a c t i v e , 由 它 负 责 处 理 断 开 逻 辑
if currentState != . closed && currentState != . disconnected {
// 这 样 可 以 避 免 重 复 处 理 和 过 度 重 连
handleDisconnect ( )
}
}
}
func handleDisconnect ( ) {
func handleDisconnect ( ) {
@@ -985,10 +991,13 @@ class ConnectionHandler: ChannelInboundHandler {
promise . futureResult . whenComplete { result in
promise . futureResult . whenComplete { result in
do {
do {
try result . get ( )
try result . get ( )
self . fire ( . disconnected )
} catch {
} catch {
logger . error ( " Error closing connection: \( error ) " )
// 降 级 为 d e b u g : 网 络 断 开 时 T L S 无 法 完 成 正 常 关 闭 握 手 是 预 期 行 为
logger . debug ( " Connection closed with error (will reconnect): \( error ) " )
}
}
// 无 论 成 功 还 是 失 败 , 都 要 触 发 . d i s c o n n e c t e d 事 件
// 这 样 上 层 才 能 感 知 到 连 接 已 断 开
self . fire ( . disconnected )
// O n l y s t a r t r e c o n n e c t a f t e r d i s c o n n e c t i s c o m p l e t e
// O n l y s t a r t r e c o n n e c t a f t e r d i s c o n n e c t i s c o m p l e t e
self . handleReconnect ( )
self . handleReconnect ( )
}
}
@@ -1004,21 +1013,28 @@ class ConnectionHandler: ChannelInboundHandler {
oldTask . cancel ( )
oldTask . cancel ( )
}
}
logger . info ( " 🔄 Starting reconnection process... " )
reconnectTask = Task {
reconnectTask = Task {
var connected = false
var connected = false
var attempt = 0
while ! Task . isCancelled
while ! Task . isCancelled
&& ( maxReconnects = = nil || self . reconnectAttempts < maxReconnects ! )
&& ( maxReconnects = = nil || self . reconnectAttempts < maxReconnects ! )
{
{
attempt += 1
logger . info ( " 🔄 Reconnect attempt \( attempt ) , total reconnectAttempts: \( self . reconnectAttempts ) " )
do {
do {
try await self . connect ( )
try await self . connect ( )
connected = true
connected = true
logger . info ( " ✅ Reconnection successful after \( attempt ) attempts " )
break // S u c c e s s f u l l y c o n n e c t e d
break // S u c c e s s f u l l y c o n n e c t e d
} catch is CancellationError {
} catch is CancellationError {
logger . debug ( " Reconnect task cancelled " )
logger . info ( " ⚠️ Reconnect task cancelled" )
return
return
} catch {
} catch {
logger . debu g( " Could not reconnect : \( error ) " )
logger . warnin g( " ⚠️ Reconnect attempt \( attempt ) failed : \( error ) " )
if ! Task . isCancelled {
if ! Task . isCancelled {
logger . info ( " ⏳ Waiting \( Double ( self . reconnectWait ) / 1_000_000_000 ) s before next attempt... " )
try ? await Task . sleep ( nanoseconds : self . reconnectWait )
try ? await Task . sleep ( nanoseconds : self . reconnectWait )
}
}
}
}
@@ -1026,13 +1042,13 @@ class ConnectionHandler: ChannelInboundHandler {
// E a r l y r e t u r n i f c a n c e l l e d
// E a r l y r e t u r n i f c a n c e l l e d
if Task . isCancelled {
if Task . isCancelled {
logger . debug ( " Reconnect task cancelled after connection attempts " )
logger . info ( " ⚠️ Reconnect task cancelled after \( attempt ) connection attempts" )
return
return
}
}
// I f w e g o t h e r e w i t h o u t c o n n e c t i n g a n d w e r e n ' t c a n c e l l e d , w e h i t m a x r e c o n n e c t s
// I f w e g o t h e r e w i t h o u t c o n n e c t i n g a n d w e r e n ' t c a n c e l l e d , w e h i t m a x r e c o n n e c t s
if ! connected {
if ! connected {
logger . error ( " Could not reconnect; maxReconnects exceeded " )
logger . error ( " ❌ Could not reconnect; maxReconnects exceeded ( \( self . maxReconnects ? ? - 1 ) ) " )
try ? await self . close ( )
try ? await self . close ( )
return
return
}
}
@@ -1060,7 +1076,8 @@ class ConnectionHandler: ChannelInboundHandler {
// T r i g g e r r e c o n n e c t t o r e c o v e r
// T r i g g e r r e c o n n e c t t o r e c o v e r
let currentState = state . withLockedValue { $0 }
let currentState = state . withLockedValue { $0 }
if currentState = = . connected {
if currentState = = . connected {
logger . error ( " Write failed: batchBuffer is nil but state is connected, triggering reconnect " )
// 降 级 为 d e b u g : 这 是 断 网 恢 复 场 景 的 正 常 状 态
logger . debug ( " Write failed: batchBuffer is nil, triggering reconnect " )
handleDisconnect ( )
handleDisconnect ( )
}
}
throw NatsError . ClientError . invalidConnection ( " not connected " )
throw NatsError . ClientError . invalidConnection ( " not connected " )
@@ -1071,7 +1088,8 @@ class ConnectionHandler: ChannelInboundHandler {
// T r i g g e r r e c o n n e c t o n w r i t e f a i l u r e - c o n n e c t i o n m a y b e b r o k e n
// T r i g g e r r e c o n n e c t o n w r i t e f a i l u r e - c o n n e c t i o n m a y b e b r o k e n
let currentState = state . withLockedValue { $0 }
let currentState = state . withLockedValue { $0 }
if currentState = = . connected {
if currentState = = . connected {
logger . error ( " Write operation failed, triggering reconnect: \( error ) " )
// 降 级 为 d e b u g : 网 络 断 开 时 写 入 失 败 是 预 期 行 为
logger . debug ( " Write operation failed, triggering reconnect: \( error ) " )
handleDisconnect ( )
handleDisconnect ( )
}
}
throw NatsError . ClientError . io ( error )
throw NatsError . ClientError . io ( error )