@@ -46,6 +46,7 @@ class ConnectionHandler: ChannelInboundHandler {
private var rootCertificate : URL ?
private var clientCertificate : URL ?
private var clientKey : URL ?
private let clientName : String
typealias InboundIn = ByteBuffer
private let state = NIOLockedValueBox ( NatsState . pending )
@@ -86,7 +87,7 @@ class ConnectionHandler: ChannelInboundHandler {
retainServersOrder : Bool ,
pingInterval : TimeInterval , auth : Auth ? , requireTls : Bool , tlsFirst : Bool ,
clientCertificate : URL ? , clientKey : URL ? ,
rootCertificate : URL ? , retryOnFailedConnect : Bool
rootCertificate : URL ? , retryOnFailedConnect : Bool , name : String
) {
self . urls = urls
self . group = . singleton
@@ -102,6 +103,7 @@ class ConnectionHandler: ChannelInboundHandler {
self . clientKey = clientKey
self . rootCertificate = rootCertificate
self . retryOnFailedConnect = retryOnFailedConnect
self . clientName = name
}
func channelRead ( context : ChannelHandlerContext , data : NIOAny ) {
@@ -356,6 +358,10 @@ class ConnectionHandler: ChannelInboundHandler {
guard let channel = self . channel else {
throw NatsError . ClientError . internalError ( " empty channel " )
}
// 重 连 成 功 后 重 置 p i n g 计 数 器 , 避 免 累 积 的 失 败 计 数 导 致 立 即 断 开
self . outstandingPings . store ( 0 , ordering : . relaxed )
// S c h e d u l e t h e t a s k t o s e n d a P I N G p e r i o d i c a l l y
let pingInterval = TimeAmount . nanoseconds ( Int64 ( self . pingInterval * 1_000_000_000 ) )
self . pingTask = channel . eventLoop . scheduleRepeatedTask (
@@ -481,7 +487,7 @@ class ConnectionHandler: ChannelInboundHandler {
private func sendClientConnectInit ( ) async throws {
var initialConnect = ConnectInfo (
verbose : false , pedantic : false , userJwt : nil , nkey : " " , name : " " , echo : true ,
verbose : false , pedantic : false , userJwt : nil , nkey : " " , name : self . clientName , echo : true ,
lang : self . lang , version : self . version , natsProtocol : . dynamic , tlsRequired : false ,
user : self . auth ? . user ? ? " " , pass : self . auth ? . password ? ? " " ,
authToken : self . auth ? . token ? ? " " , headers : true , noResponders : true )
@@ -985,11 +991,13 @@ class ConnectionHandler: ChannelInboundHandler {
promise . futureResult . whenComplete { result in
do {
try result . get ( )
self . fire ( . disconnected )
} catch {
// 降 级 为 d e b u g : 网 络 断 开 时 T L S 无 法 完 成 正 常 关 闭 握 手 是 预 期 行 为
logger . debug ( " Connection closed with error (will reconnect): \( error ) " )
}
// 无 论 成 功 还 是 失 败 , 都 要 触 发 . d i s c o n n e c t e d 事 件
// 这 样 上 层 才 能 感 知 到 连 接 已 断 开
self . fire ( . disconnected )
// O n l y s t a r t r e c o n n e c t a f t e r d i s c o n n e c t i s c o m p l e t e
self . handleReconnect ( )
}
@@ -1005,21 +1013,28 @@ class ConnectionHandler: ChannelInboundHandler {
oldTask . cancel ( )
}
logger . info ( " 🔄 Starting reconnection process... " )
reconnectTask = Task {
var connected = false
var attempt = 0
while ! Task . isCancelled
&& ( maxReconnects = = nil || self . reconnectAttempts < maxReconnects ! )
{
attempt += 1
logger . info ( " 🔄 Reconnect attempt \( attempt ) , total reconnectAttempts: \( self . reconnectAttempts ) " )
do {
try await self . connect ( )
connected = true
logger . info ( " ✅ Reconnection successful after \( attempt ) attempts " )
break // S u c c e s s f u l l y c o n n e c t e d
} catch is CancellationError {
logger . debug ( " Reconnect task cancelled " )
logger . info ( " ⚠️ Reconnect task cancelled" )
return
} catch {
logger . debu g( " Could not reconnect : \( error ) " )
logger . warnin g( " ⚠️ Reconnect attempt \( attempt ) failed : \( error ) " )
if ! Task . isCancelled {
logger . info ( " ⏳ Waiting \( Double ( self . reconnectWait ) / 1_000_000_000 ) s before next attempt... " )
try ? await Task . sleep ( nanoseconds : self . reconnectWait )
}
}
@@ -1027,13 +1042,13 @@ class ConnectionHandler: ChannelInboundHandler {
// E a r l y r e t u r n i f c a n c e l l e d
if Task . isCancelled {
logger . debug ( " Reconnect task cancelled after connection attempts " )
logger . info ( " ⚠️ Reconnect task cancelled after \( attempt ) connection attempts" )
return
}
// I f w e g o t h e r e w i t h o u t c o n n e c t i n g a n d w e r e n ' t c a n c e l l e d , w e h i t m a x r e c o n n e c t s
if ! connected {
logger . error ( " Could not reconnect; maxReconnects exceeded " )
logger . error ( " ❌ Could not reconnect; maxReconnects exceeded ( \( self . maxReconnects ? ? - 1 ) ) " )
try ? await self . close ( )
return
}