From 153e600bbc203450993d48a363fb148a9a208cd5 Mon Sep 17 00:00:00 2001 From: wenzuhuai Date: Mon, 12 Jan 2026 19:16:56 +0800 Subject: [PATCH] fix: clear pending pings to avoid promise leaks on connection close/suspend - Add cancel() method to RttCommand to fail promise on connection close - Add dequeueAll() method to ConcurrentQueue for batch cleanup - Call clearPendingPings() in close(), suspend(), and disconnect() methods - Prevents 'leaking promise' crash when connection is closed while pings are pending --- Sources/Nats/ConcurrentQueue.swift | 9 +++++++++ Sources/Nats/NatsConnection.swift | 17 +++++++++++++++++ Sources/Nats/RttCommand.swift | 5 +++++ 3 files changed, 31 insertions(+) diff --git a/Sources/Nats/ConcurrentQueue.swift b/Sources/Nats/ConcurrentQueue.swift index 75a312a..235e136 100644 --- a/Sources/Nats/ConcurrentQueue.swift +++ b/Sources/Nats/ConcurrentQueue.swift @@ -29,4 +29,13 @@ internal class ConcurrentQueue { guard !elements.isEmpty else { return nil } return elements.removeFirst() } + + /// Dequeue all elements at once (used for cleanup) + func dequeueAll() -> [T] { + lock.lock() + defer { lock.unlock() } + let all = elements + elements.removeAll() + return all + } } diff --git a/Sources/Nats/NatsConnection.swift b/Sources/Nats/NatsConnection.swift index a12be50..8a3e625 100644 --- a/Sources/Nats/NatsConnection.swift +++ b/Sources/Nats/NatsConnection.swift @@ -712,6 +712,7 @@ class ConnectionHandler: ChannelInboundHandler { guard let eventLoop = self.channel?.eventLoop else { self.state.withLockedValue { $0 = .closed } self.pingTask?.cancel() + clearPendingPings() // Clear pending pings to avoid promise leaks self.fire(.closed) return } @@ -720,6 +721,7 @@ class ConnectionHandler: ChannelInboundHandler { eventLoop.execute { self.state.withLockedValue { $0 = .closed } self.pingTask?.cancel() + self.clearPendingPings() // Clear pending pings to avoid promise leaks self.channel?.close(mode: .all, promise: promise) } @@ -735,8 +737,20 @@ class ConnectionHandler: ChannelInboundHandler { private func disconnect() async throws { self.pingTask?.cancel() + clearPendingPings() // Clear pending pings to avoid promise leaks try await self.channel?.close().get() } + + /// Clear all pending ping requests to avoid promise leaks + private func clearPendingPings() { + let pendingPings = pingQueue.dequeueAll() + for ping in pendingPings { + ping.cancel() + } + if !pendingPings.isEmpty { + logger.debug("Cleared \(pendingPings.count) pending ping(s)") + } + } func suspend() async throws { self.reconnectTask?.cancel() @@ -746,6 +760,7 @@ class ConnectionHandler: ChannelInboundHandler { guard let eventLoop = self.channel?.eventLoop else { // Set state to suspended even if channel is nil self.state.withLockedValue { $0 = .suspended } + clearPendingPings() // Clear pending pings to avoid promise leaks return } let promise = eventLoop.makePromise(of: Void.self) @@ -759,8 +774,10 @@ class ConnectionHandler: ChannelInboundHandler { if shouldClose { self.pingTask?.cancel() + self.clearPendingPings() // Clear pending pings to avoid promise leaks self.channel?.close(mode: .all, promise: promise) } else { + self.clearPendingPings() // Clear pending pings even if not closing promise.succeed() } } diff --git a/Sources/Nats/RttCommand.swift b/Sources/Nats/RttCommand.swift index 14675a4..e6b96d9 100644 --- a/Sources/Nats/RttCommand.swift +++ b/Sources/Nats/RttCommand.swift @@ -36,4 +36,9 @@ internal class RttCommand { func getRoundTripTime() async throws -> TimeInterval { try await promise?.futureResult.get() ?? 0 } + + /// Cancel the ping request to avoid promise leaks when connection is closed + func cancel() { + promise?.fail(NatsError.ClientError.connectionClosed) + } }