6 Commits
main ... 1.0.7

Author SHA1 Message Date
wenzuhuai
ec4618db87 fix: 重连成功后重置 outstandingPings 计数器,修复频繁重连问题 2026-01-23 09:58:40 +08:00
wenzuhuai
30fb5d8b27 fix: 将断网场景的错误日志降级为 debug 级别
- Error closing connection 降级为 debug(TLS 关闭错误是断网预期行为)
- Write failed: batchBuffer is nil 降级为 debug
- Write operation failed 降级为 debug
- 避免长时间断网恢复时日志泛滥
2026-01-23 09:34:33 +08:00
wenzuhuai
cd680abe55 fix: 修复 errorCaught 导致的频繁重连问题
- 移除 errorCaught 中重复的 handleDisconnect 调用
- 将 uncleanShutdown 日志降级为 debug 级别
- 避免双重触发导致的断开重连循环
2026-01-23 09:06:50 +08:00
wenzuhuai
b014494819 fix: write操作在batchBuffer为nil时也触发重连
- 解决"假连接"边界情况:状态为connected但batchBuffer为nil
- 确保发送消息失败后能自动恢复连接
2026-01-22 09:52:52 +08:00
wenzuhuai
387f6cf273 fix: 修复闭包参数混用导致的编译错误 2026-01-22 09:18:44 +08:00
wenzuhuai
e452971586 fix: 修复channelInactive状态检查不完整导致的重连问题
- 将 channelInactive 的状态检查从仅处理 connected 状态改为处理所有非 closed/disconnected 状态
- 与 errorCaught 的处理逻辑保持一致
- 确保在 connecting/pending 状态下网络断开时也能正确触发重连
2026-01-22 09:08:01 +08:00

View File

@@ -356,6 +356,10 @@ class ConnectionHandler: ChannelInboundHandler {
guard let channel = self.channel else { guard let channel = self.channel else {
throw NatsError.ClientError.internalError("empty channel") throw NatsError.ClientError.internalError("empty channel")
} }
// ping
self.outstandingPings.store(0, ordering: .relaxed)
// Schedule the task to send a PING periodically // Schedule the task to send a PING periodically
let pingInterval = TimeAmount.nanoseconds(Int64(self.pingInterval * 1_000_000_000)) let pingInterval = TimeAmount.nanoseconds(Int64(self.pingInterval * 1_000_000_000))
self.pingTask = channel.eventLoop.scheduleRepeatedTask( self.pingTask = channel.eventLoop.scheduleRepeatedTask(
@@ -901,7 +905,7 @@ class ConnectionHandler: ChannelInboundHandler {
continuation.resume(throwing: errorToUse) continuation.resume(throwing: errorToUse)
} }
let shouldHandleDisconnect = state.withLockedValue { $0 == .connected } let shouldHandleDisconnect = state.withLockedValue { $0 != .closed && $0 != .disconnected }
if shouldHandleDisconnect { if shouldHandleDisconnect {
handleDisconnect() handleDisconnect()
} }
@@ -921,19 +925,49 @@ class ConnectionHandler: ChannelInboundHandler {
if let natsErr = error as? NatsErrorProtocol { if let natsErr = error as? NatsErrorProtocol {
self.fire(.error(natsErr)) self.fire(.error(natsErr))
} else { } else {
logger.error("unexpected error: \(error)") // debug
} // uncleanShutdown TLS
let currentState = state.withLockedValue { $0 } logger.debug("Channel error (will reconnect if needed): \(error)")
if currentState == .pending || currentState == .connecting {
handleDisconnect()
} else if currentState == .disconnected {
handleReconnect()
} }
// handleDisconnect
// context.close() channelInactive
//
} }
func handleDisconnect() { func handleDisconnect() {
// Prevent duplicate disconnect handling
let shouldProceed = state.withLockedValue { currentState -> Bool in
if currentState == .disconnected || currentState == .closed {
return false // Already in disconnected/closed state
}
return true
}
guard shouldProceed else {
return
}
// Set state to disconnected after check
state.withLockedValue { $0 = .disconnected } state.withLockedValue { $0 = .disconnected }
// Clean up pending continuations to prevent leaks
if let continuation = serverInfoContinuation.withLockedValue({ cont in
let toResume = cont
cont = nil
return toResume
}) {
continuation.resume(throwing: NatsError.ClientError.connectionClosed)
}
if let continuation = connectionEstablishedContinuation.withLockedValue({ cont in
let toResume = cont
cont = nil
return toResume
}) {
continuation.resume(throwing: NatsError.ClientError.connectionClosed)
}
// Safely clear batchBuffer first to avoid race conditions // Safely clear batchBuffer first to avoid race conditions
let bufferToRelease = self.batchBuffer let bufferToRelease = self.batchBuffer
self.batchBuffer = nil self.batchBuffer = nil
@@ -957,18 +991,24 @@ class ConnectionHandler: ChannelInboundHandler {
try result.get() try result.get()
self.fire(.disconnected) self.fire(.disconnected)
} catch { } catch {
logger.error("Error closing connection: \(error)") // debug TLS
logger.debug("Connection closed with error (will reconnect): \(error)")
} }
// Only start reconnect after disconnect is complete // Only start reconnect after disconnect is complete
self.handleReconnect() self.handleReconnect()
} }
} else { } else {
// No channel, start reconnect immediately self.fire(.disconnected)
handleReconnect() handleReconnect()
} }
} }
func handleReconnect() { func handleReconnect() {
// Cancel any existing reconnect task to prevent multiple concurrent reconnections
if let oldTask = reconnectTask {
oldTask.cancel()
}
reconnectTask = Task { reconnectTask = Task {
var connected = false var connected = false
while !Task.isCancelled while !Task.isCancelled
@@ -984,7 +1024,7 @@ class ConnectionHandler: ChannelInboundHandler {
} catch { } catch {
logger.debug("Could not reconnect: \(error)") logger.debug("Could not reconnect: \(error)")
if !Task.isCancelled { if !Task.isCancelled {
try await Task.sleep(nanoseconds: self.reconnectWait) try? await Task.sleep(nanoseconds: self.reconnectWait)
} }
} }
} }
@@ -998,7 +1038,7 @@ class ConnectionHandler: ChannelInboundHandler {
// If we got here without connecting and weren't cancelled, we hit max reconnects // If we got here without connecting and weren't cancelled, we hit max reconnects
if !connected { if !connected {
logger.error("Could not reconnect; maxReconnects exceeded") logger.error("Could not reconnect; maxReconnects exceeded")
try await self.close() try? await self.close()
return return
} }
@@ -1021,6 +1061,14 @@ class ConnectionHandler: ChannelInboundHandler {
func write(operation: ClientOp) async throws { func write(operation: ClientOp) async throws {
guard let buffer = self.batchBuffer else { guard let buffer = self.batchBuffer else {
// If state is connected but batchBuffer is nil, this is a "fake connection" state
// Trigger reconnect to recover
let currentState = state.withLockedValue { $0 }
if currentState == .connected {
// debug
logger.debug("Write failed: batchBuffer is nil, triggering reconnect")
handleDisconnect()
}
throw NatsError.ClientError.invalidConnection("not connected") throw NatsError.ClientError.invalidConnection("not connected")
} }
do { do {
@@ -1029,7 +1077,8 @@ class ConnectionHandler: ChannelInboundHandler {
// Trigger reconnect on write failure - connection may be broken // Trigger reconnect on write failure - connection may be broken
let currentState = state.withLockedValue { $0 } let currentState = state.withLockedValue { $0 }
if currentState == .connected { if currentState == .connected {
logger.error("Write operation failed, triggering reconnect: \(error)") // debug
logger.debug("Write operation failed, triggering reconnect: \(error)")
handleDisconnect() handleDisconnect()
} }
throw NatsError.ClientError.io(error) throw NatsError.ClientError.io(error)