fix: 修复channelInactive状态检查不完整导致的重连问题

- 将 channelInactive 的状态检查从仅处理 connected 状态改为处理所有非 closed/disconnected 状态
- 与 errorCaught 的处理逻辑保持一致
- 确保在 connecting/pending 状态下网络断开时也能正确触发重连
This commit is contained in:
wenzuhuai
2026-01-22 09:08:01 +08:00
parent 5ad1174292
commit e452971586

View File

@@ -901,7 +901,7 @@ class ConnectionHandler: ChannelInboundHandler {
continuation.resume(throwing: errorToUse)
}
let shouldHandleDisconnect = state.withLockedValue { $0 == .connected }
let shouldHandleDisconnect = state.withLockedValue { $0 != .closed && $0 != .disconnected }
if shouldHandleDisconnect {
handleDisconnect()
}
@@ -923,16 +923,44 @@ class ConnectionHandler: ChannelInboundHandler {
} else {
logger.error("unexpected error: \(error)")
}
// Unified handling: use handleDisconnect for all non-closed/non-disconnected states
let currentState = state.withLockedValue { $0 }
if currentState == .pending || currentState == .connecting {
if currentState != .closed && currentState != .disconnected {
handleDisconnect()
} else if currentState == .disconnected {
handleReconnect()
}
}
func handleDisconnect() {
state.withLockedValue { $0 = .disconnected }
// Prevent duplicate disconnect handling
let shouldProceed = state.withLockedValue { currentState -> Bool in
if currentState == .disconnected || currentState == .closed {
return false // Already in disconnected/closed state
}
$0 = .disconnected
return true
}
guard shouldProceed else {
return
}
// Clean up pending continuations to prevent leaks
if let continuation = serverInfoContinuation.withLockedValue({ cont in
let toResume = cont
cont = nil
return toResume
}) {
continuation.resume(throwing: NatsError.ClientError.connectionClosed)
}
if let continuation = connectionEstablishedContinuation.withLockedValue({ cont in
let toResume = cont
cont = nil
return toResume
}) {
continuation.resume(throwing: NatsError.ClientError.connectionClosed)
}
// Safely clear batchBuffer first to avoid race conditions
let bufferToRelease = self.batchBuffer
@@ -963,12 +991,17 @@ class ConnectionHandler: ChannelInboundHandler {
self.handleReconnect()
}
} else {
// No channel, start reconnect immediately
self.fire(.disconnected)
handleReconnect()
}
}
func handleReconnect() {
// Cancel any existing reconnect task to prevent multiple concurrent reconnections
if let oldTask = reconnectTask {
oldTask.cancel()
}
reconnectTask = Task {
var connected = false
while !Task.isCancelled
@@ -984,7 +1017,7 @@ class ConnectionHandler: ChannelInboundHandler {
} catch {
logger.debug("Could not reconnect: \(error)")
if !Task.isCancelled {
try await Task.sleep(nanoseconds: self.reconnectWait)
try? await Task.sleep(nanoseconds: self.reconnectWait)
}
}
}
@@ -998,7 +1031,7 @@ class ConnectionHandler: ChannelInboundHandler {
// If we got here without connecting and weren't cancelled, we hit max reconnects
if !connected {
logger.error("Could not reconnect; maxReconnects exceeded")
try await self.close()
try? await self.close()
return
}