Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b014494819 | ||
|
|
387f6cf273 |
@@ -937,7 +937,6 @@ class ConnectionHandler: ChannelInboundHandler {
|
|||||||
if currentState == .disconnected || currentState == .closed {
|
if currentState == .disconnected || currentState == .closed {
|
||||||
return false // Already in disconnected/closed state
|
return false // Already in disconnected/closed state
|
||||||
}
|
}
|
||||||
$0 = .disconnected
|
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -945,6 +944,9 @@ class ConnectionHandler: ChannelInboundHandler {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Set state to disconnected after check
|
||||||
|
state.withLockedValue { $0 = .disconnected }
|
||||||
|
|
||||||
// Clean up pending continuations to prevent leaks
|
// Clean up pending continuations to prevent leaks
|
||||||
if let continuation = serverInfoContinuation.withLockedValue({ cont in
|
if let continuation = serverInfoContinuation.withLockedValue({ cont in
|
||||||
let toResume = cont
|
let toResume = cont
|
||||||
@@ -1054,6 +1056,13 @@ class ConnectionHandler: ChannelInboundHandler {
|
|||||||
|
|
||||||
func write(operation: ClientOp) async throws {
|
func write(operation: ClientOp) async throws {
|
||||||
guard let buffer = self.batchBuffer else {
|
guard let buffer = self.batchBuffer else {
|
||||||
|
// If state is connected but batchBuffer is nil, this is a "fake connection" state
|
||||||
|
// Trigger reconnect to recover
|
||||||
|
let currentState = state.withLockedValue { $0 }
|
||||||
|
if currentState == .connected {
|
||||||
|
logger.error("Write failed: batchBuffer is nil but state is connected, triggering reconnect")
|
||||||
|
handleDisconnect()
|
||||||
|
}
|
||||||
throw NatsError.ClientError.invalidConnection("not connected")
|
throw NatsError.ClientError.invalidConnection("not connected")
|
||||||
}
|
}
|
||||||
do {
|
do {
|
||||||
|
|||||||
Reference in New Issue
Block a user