From e452971586254b3427e062eaa86647e45a93001e Mon Sep 17 00:00:00 2001 From: wenzuhuai Date: Thu, 22 Jan 2026 09:08:01 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E4=BF=AE=E5=A4=8DchannelInactive?= =?UTF-8?q?=E7=8A=B6=E6=80=81=E6=A3=80=E6=9F=A5=E4=B8=8D=E5=AE=8C=E6=95=B4?= =?UTF-8?q?=E5=AF=BC=E8=87=B4=E7=9A=84=E9=87=8D=E8=BF=9E=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 将 channelInactive 的状态检查从仅处理 connected 状态改为处理所有非 closed/disconnected 状态 - 与 errorCaught 的处理逻辑保持一致 - 确保在 connecting/pending 状态下网络断开时也能正确触发重连 --- Sources/Nats/NatsConnection.swift | 49 ++++++++++++++++++++++++++----- 1 file changed, 41 insertions(+), 8 deletions(-) diff --git a/Sources/Nats/NatsConnection.swift b/Sources/Nats/NatsConnection.swift index fac0d0f..bbb6457 100644 --- a/Sources/Nats/NatsConnection.swift +++ b/Sources/Nats/NatsConnection.swift @@ -901,7 +901,7 @@ class ConnectionHandler: ChannelInboundHandler { continuation.resume(throwing: errorToUse) } - let shouldHandleDisconnect = state.withLockedValue { $0 == .connected } + let shouldHandleDisconnect = state.withLockedValue { $0 != .closed && $0 != .disconnected } if shouldHandleDisconnect { handleDisconnect() } @@ -923,16 +923,44 @@ class ConnectionHandler: ChannelInboundHandler { } else { logger.error("unexpected error: \(error)") } + + // Unified handling: use handleDisconnect for all non-closed/non-disconnected states let currentState = state.withLockedValue { $0 } - if currentState == .pending || currentState == .connecting { + if currentState != .closed && currentState != .disconnected { handleDisconnect() - } else if currentState == .disconnected { - handleReconnect() } } func handleDisconnect() { - state.withLockedValue { $0 = .disconnected } + // Prevent duplicate disconnect handling + let shouldProceed = state.withLockedValue { currentState -> Bool in + if currentState == .disconnected || currentState == .closed { + return false // Already in disconnected/closed state + } + $0 = .disconnected + return true + } + + guard shouldProceed else { + return + } + + // Clean up pending continuations to prevent leaks + if let continuation = serverInfoContinuation.withLockedValue({ cont in + let toResume = cont + cont = nil + return toResume + }) { + continuation.resume(throwing: NatsError.ClientError.connectionClosed) + } + + if let continuation = connectionEstablishedContinuation.withLockedValue({ cont in + let toResume = cont + cont = nil + return toResume + }) { + continuation.resume(throwing: NatsError.ClientError.connectionClosed) + } // Safely clear batchBuffer first to avoid race conditions let bufferToRelease = self.batchBuffer @@ -963,12 +991,17 @@ class ConnectionHandler: ChannelInboundHandler { self.handleReconnect() } } else { - // No channel, start reconnect immediately + self.fire(.disconnected) handleReconnect() } } func handleReconnect() { + // Cancel any existing reconnect task to prevent multiple concurrent reconnections + if let oldTask = reconnectTask { + oldTask.cancel() + } + reconnectTask = Task { var connected = false while !Task.isCancelled @@ -984,7 +1017,7 @@ class ConnectionHandler: ChannelInboundHandler { } catch { logger.debug("Could not reconnect: \(error)") if !Task.isCancelled { - try await Task.sleep(nanoseconds: self.reconnectWait) + try? await Task.sleep(nanoseconds: self.reconnectWait) } } } @@ -998,7 +1031,7 @@ class ConnectionHandler: ChannelInboundHandler { // If we got here without connecting and weren't cancelled, we hit max reconnects if !connected { logger.error("Could not reconnect; maxReconnects exceeded") - try await self.close() + try? await self.close() return }