Compare commits
6 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ec4618db87 | ||
|
|
30fb5d8b27 | ||
|
|
cd680abe55 | ||
|
|
b014494819 | ||
|
|
387f6cf273 | ||
|
|
e452971586 |
@@ -356,6 +356,10 @@ class ConnectionHandler: ChannelInboundHandler {
|
|||||||
guard let channel = self.channel else {
|
guard let channel = self.channel else {
|
||||||
throw NatsError.ClientError.internalError("empty channel")
|
throw NatsError.ClientError.internalError("empty channel")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 重连成功后重置 ping 计数器,避免累积的失败计数导致立即断开
|
||||||
|
self.outstandingPings.store(0, ordering: .relaxed)
|
||||||
|
|
||||||
// Schedule the task to send a PING periodically
|
// Schedule the task to send a PING periodically
|
||||||
let pingInterval = TimeAmount.nanoseconds(Int64(self.pingInterval * 1_000_000_000))
|
let pingInterval = TimeAmount.nanoseconds(Int64(self.pingInterval * 1_000_000_000))
|
||||||
self.pingTask = channel.eventLoop.scheduleRepeatedTask(
|
self.pingTask = channel.eventLoop.scheduleRepeatedTask(
|
||||||
@@ -901,7 +905,7 @@ class ConnectionHandler: ChannelInboundHandler {
|
|||||||
continuation.resume(throwing: errorToUse)
|
continuation.resume(throwing: errorToUse)
|
||||||
}
|
}
|
||||||
|
|
||||||
let shouldHandleDisconnect = state.withLockedValue { $0 == .connected }
|
let shouldHandleDisconnect = state.withLockedValue { $0 != .closed && $0 != .disconnected }
|
||||||
if shouldHandleDisconnect {
|
if shouldHandleDisconnect {
|
||||||
handleDisconnect()
|
handleDisconnect()
|
||||||
}
|
}
|
||||||
@@ -921,19 +925,49 @@ class ConnectionHandler: ChannelInboundHandler {
|
|||||||
if let natsErr = error as? NatsErrorProtocol {
|
if let natsErr = error as? NatsErrorProtocol {
|
||||||
self.fire(.error(natsErr))
|
self.fire(.error(natsErr))
|
||||||
} else {
|
} else {
|
||||||
logger.error("unexpected error: \(error)")
|
// 降级为 debug 级别,避免频繁输出错误日志
|
||||||
}
|
// uncleanShutdown 是常见的 TLS 关闭情况,不需要作为错误处理
|
||||||
let currentState = state.withLockedValue { $0 }
|
logger.debug("Channel error (will reconnect if needed): \(error)")
|
||||||
if currentState == .pending || currentState == .connecting {
|
|
||||||
handleDisconnect()
|
|
||||||
} else if currentState == .disconnected {
|
|
||||||
handleReconnect()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 注意:不在这里调用 handleDisconnect
|
||||||
|
// context.close() 会触发 channelInactive,由它负责处理断开逻辑
|
||||||
|
// 这样可以避免重复处理和过度重连
|
||||||
}
|
}
|
||||||
|
|
||||||
func handleDisconnect() {
|
func handleDisconnect() {
|
||||||
|
// Prevent duplicate disconnect handling
|
||||||
|
let shouldProceed = state.withLockedValue { currentState -> Bool in
|
||||||
|
if currentState == .disconnected || currentState == .closed {
|
||||||
|
return false // Already in disconnected/closed state
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
guard shouldProceed else {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set state to disconnected after check
|
||||||
state.withLockedValue { $0 = .disconnected }
|
state.withLockedValue { $0 = .disconnected }
|
||||||
|
|
||||||
|
// Clean up pending continuations to prevent leaks
|
||||||
|
if let continuation = serverInfoContinuation.withLockedValue({ cont in
|
||||||
|
let toResume = cont
|
||||||
|
cont = nil
|
||||||
|
return toResume
|
||||||
|
}) {
|
||||||
|
continuation.resume(throwing: NatsError.ClientError.connectionClosed)
|
||||||
|
}
|
||||||
|
|
||||||
|
if let continuation = connectionEstablishedContinuation.withLockedValue({ cont in
|
||||||
|
let toResume = cont
|
||||||
|
cont = nil
|
||||||
|
return toResume
|
||||||
|
}) {
|
||||||
|
continuation.resume(throwing: NatsError.ClientError.connectionClosed)
|
||||||
|
}
|
||||||
|
|
||||||
// Safely clear batchBuffer first to avoid race conditions
|
// Safely clear batchBuffer first to avoid race conditions
|
||||||
let bufferToRelease = self.batchBuffer
|
let bufferToRelease = self.batchBuffer
|
||||||
self.batchBuffer = nil
|
self.batchBuffer = nil
|
||||||
@@ -957,18 +991,24 @@ class ConnectionHandler: ChannelInboundHandler {
|
|||||||
try result.get()
|
try result.get()
|
||||||
self.fire(.disconnected)
|
self.fire(.disconnected)
|
||||||
} catch {
|
} catch {
|
||||||
logger.error("Error closing connection: \(error)")
|
// 降级为 debug:网络断开时 TLS 无法完成正常关闭握手是预期行为
|
||||||
|
logger.debug("Connection closed with error (will reconnect): \(error)")
|
||||||
}
|
}
|
||||||
// Only start reconnect after disconnect is complete
|
// Only start reconnect after disconnect is complete
|
||||||
self.handleReconnect()
|
self.handleReconnect()
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// No channel, start reconnect immediately
|
self.fire(.disconnected)
|
||||||
handleReconnect()
|
handleReconnect()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func handleReconnect() {
|
func handleReconnect() {
|
||||||
|
// Cancel any existing reconnect task to prevent multiple concurrent reconnections
|
||||||
|
if let oldTask = reconnectTask {
|
||||||
|
oldTask.cancel()
|
||||||
|
}
|
||||||
|
|
||||||
reconnectTask = Task {
|
reconnectTask = Task {
|
||||||
var connected = false
|
var connected = false
|
||||||
while !Task.isCancelled
|
while !Task.isCancelled
|
||||||
@@ -984,7 +1024,7 @@ class ConnectionHandler: ChannelInboundHandler {
|
|||||||
} catch {
|
} catch {
|
||||||
logger.debug("Could not reconnect: \(error)")
|
logger.debug("Could not reconnect: \(error)")
|
||||||
if !Task.isCancelled {
|
if !Task.isCancelled {
|
||||||
try await Task.sleep(nanoseconds: self.reconnectWait)
|
try? await Task.sleep(nanoseconds: self.reconnectWait)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -998,7 +1038,7 @@ class ConnectionHandler: ChannelInboundHandler {
|
|||||||
// If we got here without connecting and weren't cancelled, we hit max reconnects
|
// If we got here without connecting and weren't cancelled, we hit max reconnects
|
||||||
if !connected {
|
if !connected {
|
||||||
logger.error("Could not reconnect; maxReconnects exceeded")
|
logger.error("Could not reconnect; maxReconnects exceeded")
|
||||||
try await self.close()
|
try? await self.close()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1021,6 +1061,14 @@ class ConnectionHandler: ChannelInboundHandler {
|
|||||||
|
|
||||||
func write(operation: ClientOp) async throws {
|
func write(operation: ClientOp) async throws {
|
||||||
guard let buffer = self.batchBuffer else {
|
guard let buffer = self.batchBuffer else {
|
||||||
|
// If state is connected but batchBuffer is nil, this is a "fake connection" state
|
||||||
|
// Trigger reconnect to recover
|
||||||
|
let currentState = state.withLockedValue { $0 }
|
||||||
|
if currentState == .connected {
|
||||||
|
// 降级为 debug:这是断网恢复场景的正常状态
|
||||||
|
logger.debug("Write failed: batchBuffer is nil, triggering reconnect")
|
||||||
|
handleDisconnect()
|
||||||
|
}
|
||||||
throw NatsError.ClientError.invalidConnection("not connected")
|
throw NatsError.ClientError.invalidConnection("not connected")
|
||||||
}
|
}
|
||||||
do {
|
do {
|
||||||
@@ -1029,7 +1077,8 @@ class ConnectionHandler: ChannelInboundHandler {
|
|||||||
// Trigger reconnect on write failure - connection may be broken
|
// Trigger reconnect on write failure - connection may be broken
|
||||||
let currentState = state.withLockedValue { $0 }
|
let currentState = state.withLockedValue { $0 }
|
||||||
if currentState == .connected {
|
if currentState == .connected {
|
||||||
logger.error("Write operation failed, triggering reconnect: \(error)")
|
// 降级为 debug:网络断开时写入失败是预期行为
|
||||||
|
logger.debug("Write operation failed, triggering reconnect: \(error)")
|
||||||
handleDisconnect()
|
handleDisconnect()
|
||||||
}
|
}
|
||||||
throw NatsError.ClientError.io(error)
|
throw NatsError.ClientError.io(error)
|
||||||
|
|||||||
Reference in New Issue
Block a user