From e178a1b0d6515977bfa7bc239f58d80c9a16ebf7 Mon Sep 17 00:00:00 2001 From: 1amageek Date: Fri, 6 Mar 2026 14:19:16 +0900 Subject: [PATCH 1/3] Fix data race in NetworkTransport send/receive continuations Remove Task { @MainActor in } wrapper and mutable continuation guard flags that caused data races under Swift 6 strict concurrency. NWConnection completion handlers are guaranteed to fire exactly once, making the double-resume guard unnecessary. - Extract reconnection logic into actor-isolated handleSendError() - Resume continuations directly in NWConnection callbacks - Simplify receiveData() by removing unnecessary guard pattern --- .../Base/Transports/NetworkTransport.swift | 97 +++++++------------ 1 file changed, 36 insertions(+), 61 deletions(-) diff --git a/Sources/MCP/Base/Transports/NetworkTransport.swift b/Sources/MCP/Base/Transports/NetworkTransport.swift index 62b623c8..85f58d8a 100644 --- a/Sources/MCP/Base/Transports/NetworkTransport.swift +++ b/Sources/MCP/Base/Transports/NetworkTransport.swift @@ -511,9 +511,6 @@ import Logging var messageWithNewline = message messageWithNewline.append(UInt8(ascii: "\n")) - // Use a local actor-isolated variable to track continuation state - var sendContinuationResumed = false - try await withCheckedThrowingContinuation { [weak self] (continuation: CheckedContinuation) in guard let self = self else { @@ -526,54 +523,41 @@ import Logging contentContext: .defaultMessage, isComplete: true, completion: .contentProcessed { [weak self] error in - guard let self = self else { return } - - Task { @MainActor in - if !sendContinuationResumed { - sendContinuationResumed = true - if let error = error { - self.logger.error("Send error: \(error)") - - // Check if we should attempt to reconnect on send failure - let isStopping = await self.isStopping // Await actor-isolated property - if !isStopping && self.reconnectionConfig.enabled { - let isConnected = await self.isConnected - if isConnected { - if error.isConnectionLost { - self.logger.warning( - "Connection appears broken, will attempt to reconnect..." - ) - - // Schedule connection restart - Task { [weak self] in // Operate on self's executor - guard let self = self else { return } - - await self.setIsConnected(false) - - try? await Task.sleep(for: .milliseconds(500)) - - let currentIsStopping = await self.isStopping - if !currentIsStopping { - // Cancel the connection, then attempt to reconnect fully. - self.connection.cancel() - try? await self.connect() - } - } - } - } - } - - continuation.resume( - throwing: MCPError.internalError("Send error: \(error)")) - } else { - continuation.resume() + if let error = error { + continuation.resume( + throwing: MCPError.internalError("Send error: \(error)")) + // Handle reconnection on the actor's executor + if let self { + Task { + await self.handleSendError(error) } } + } else { + continuation.resume() } }) } } + /// Handles reconnection logic after a send error. + /// + /// This method is actor-isolated, so it safely accesses `isStopping`, + /// `isConnected`, and `reconnectionConfig` without data races. + private func handleSendError(_ error: NWError) async { + logger.error("Send error: \(error)") + guard !isStopping && reconnectionConfig.enabled && isConnected else { return } + guard error.isConnectionLost else { return } + + logger.warning("Connection appears broken, will attempt to reconnect...") + setIsConnected(false) + + try? await Task.sleep(for: .milliseconds(500)) + + guard !isStopping else { return } + connection.cancel() + try? await connect() + } + /// Receives data in an async sequence /// /// This returns an AsyncThrowingStream that emits Data objects representing @@ -747,8 +731,6 @@ import Logging /// - Returns: The received data chunk /// - Throws: Network errors or transport failures private func receiveData() async throws -> Data { - var receiveContinuationResumed = false - return try await withCheckedThrowingContinuation { [weak self] (continuation: CheckedContinuation) in guard let self = self else { @@ -759,21 +741,14 @@ import Logging let maxLength = bufferConfig.maxReceiveBufferSize ?? Int.max connection.receive(minimumIncompleteLength: 1, maximumLength: maxLength) { content, _, isComplete, error in - Task { @MainActor in - if !receiveContinuationResumed { - receiveContinuationResumed = true - if let error = error { - continuation.resume(throwing: MCPError.transportError(error)) - } else if let content = content { - continuation.resume(returning: content) - } else if isComplete { - self.logger.trace("Connection completed by peer") - continuation.resume(throwing: MCPError.connectionClosed) - } else { - // EOF: Resume with empty data instead of throwing an error - continuation.resume(returning: Data()) - } - } + if let error = error { + continuation.resume(throwing: MCPError.transportError(error)) + } else if let content = content { + continuation.resume(returning: content) + } else if isComplete { + continuation.resume(throwing: MCPError.connectionClosed) + } else { + continuation.resume(returning: Data()) } } } From 62b9f3411fe97ded9ff9d4e8cdbcd93c74b1101f Mon Sep 17 00:00:00 2001 From: 1amageek Date: Fri, 6 Mar 2026 14:23:15 +0900 Subject: [PATCH 2/3] Add tests for NetworkTransport data race fix - Concurrent sends: 100 parallel sends to surface continuation races - Send error with reconnection: verify continuation resumes and handleSendError triggers reconnection asynchronously - Send error without reconnection: verify no reconnection attempt - Receive error: verify continuation resumes without crash - Concurrent sends with intermittent errors: mix of success/failure to verify continuation is always resumed exactly once --- Tests/MCPTests/NetworkTransportTests.swift | 169 +++++++++++++++++++++ 1 file changed, 169 insertions(+) diff --git a/Tests/MCPTests/NetworkTransportTests.swift b/Tests/MCPTests/NetworkTransportTests.swift index 9c02c90c..0cb0e595 100644 --- a/Tests/MCPTests/NetworkTransportTests.swift +++ b/Tests/MCPTests/NetworkTransportTests.swift @@ -160,6 +160,18 @@ import Testing queueDataForReceiving(data) } + /// Set a send-only error that does not change connection state. + /// Unlike `simulateFailure`, this keeps the connection in `.ready` state + /// so that `send()` proceeds to the NWConnection callback path. + func setSendError(_ error: NWError?) { + mockError = error + } + + /// Clear any injected error without changing connection state. + func clearError() { + mockError = nil + } + /// Get all sent data func getSentData() -> [Data] { return sentData @@ -661,5 +673,162 @@ import Testing // Verify connection is cleaned up #expect(weakConnection == nil, "Connection was not properly cleaned up") } + @Test("Concurrent sends do not cause data races") + func testConcurrentSendsNoCrash() async throws { + let mockConnection = MockNetworkConnection() + let transport = NetworkTransport( + mockConnection, + heartbeatConfig: .disabled, + reconnectionConfig: .disabled + ) + + try await transport.connect() + + // Fire many concurrent sends to surface any data race in continuation handling. + // Before the fix, mutable `var` flags captured by @Sendable closures could race. + try await withThrowingTaskGroup(of: Void.self) { group in + for i in 0..<100 { + group.addTask { + let msg = #"{"id":\#(i)}"#.data(using: .utf8)! + try await transport.send(msg) + } + } + try await group.waitForAll() + } + + let sentData = mockConnection.getSentData() + #expect(sentData.count == 100) + + await transport.disconnect() + } + + @Test("Send error resumes continuation and triggers reconnection") + func testSendErrorResumesAndReconnects() async throws { + let mockConnection = MockNetworkConnection() + + let reconnectionConfig = NetworkTransport.ReconnectionConfiguration( + enabled: true, + maxAttempts: 1, + backoffMultiplier: 1.0 + ) + + let transport = NetworkTransport( + mockConnection, + heartbeatConfig: .disabled, + reconnectionConfig: reconnectionConfig + ) + + try await transport.connect() + + // Inject a connection-lost error for the next send + mockConnection.setSendError(NWError.posix(POSIXErrorCode(rawValue: 57)!)) + + // send() should throw immediately without hanging + do { + try await transport.send(#"{"test":"error"}"#.data(using: .utf8)!) + Issue.record("Expected send to throw on error") + } catch { + #expect(error is MCPError) + } + + // Wait for handleSendError to run asynchronously + try await Task.sleep(for: .milliseconds(700)) + + await transport.disconnect() + } + + @Test("Send error without reconnection enabled does not reconnect") + func testSendErrorNoReconnect() async throws { + let mockConnection = MockNetworkConnection() + let transport = NetworkTransport( + mockConnection, + heartbeatConfig: .disabled, + reconnectionConfig: .disabled + ) + + try await transport.connect() + + mockConnection.setSendError(NWError.posix(POSIXErrorCode(rawValue: 57)!)) + + do { + try await transport.send(#"{"test":"no-reconnect"}"#.data(using: .utf8)!) + Issue.record("Expected send to throw on error") + } catch { + #expect(error is MCPError) + } + + // Connection should remain in ready state (no reconnection attempt) + try await Task.sleep(for: .milliseconds(100)) + #expect(mockConnection.state == .ready) + + await transport.disconnect() + } + + @Test("Receive error resumes continuation without crash") + func testReceiveErrorResumesContinuation() async throws { + let mockConnection = MockNetworkConnection() + let transport = NetworkTransport( + mockConnection, + heartbeatConfig: .disabled, + reconnectionConfig: .disabled + ) + + try await transport.connect() + + // Inject receive error after connection is established + mockConnection.simulateFailure(error: NWError.posix(POSIXErrorCode.ECONNRESET)) + + let stream = await transport.receive() + var receivedError = false + + do { + for try await _ in stream { + break + } + } catch { + receivedError = true + } + + #expect(receivedError, "Expected receive stream to surface the error") + + await transport.disconnect() + } + + @Test("Concurrent sends with intermittent errors") + func testConcurrentSendsWithErrors() async throws { + let mockConnection = MockNetworkConnection() + let transport = NetworkTransport( + mockConnection, + heartbeatConfig: .disabled, + reconnectionConfig: .disabled + ) + + try await transport.connect() + + // Mix of successful and failing sends should not crash or deadlock. + // This tests that continuation resume is always called exactly once. + var successCount = 0 + var errorCount = 0 + + for i in 0..<20 { + if i == 10 { + mockConnection.setSendError(NWError.posix(POSIXErrorCode(rawValue: 57)!)) + } else if i == 11 { + mockConnection.clearError() + } + + do { + try await transport.send(#"{"id":\#(i)}"#.data(using: .utf8)!) + successCount += 1 + } catch { + errorCount += 1 + } + } + + #expect(successCount > 0) + #expect(errorCount > 0) + + await transport.disconnect() + } } #endif From 06d3ab02004456845606ae593d1e4d86cd73ad40 Mon Sep 17 00:00:00 2001 From: 1amageek Date: Fri, 6 Mar 2026 14:29:51 +0900 Subject: [PATCH 3/3] Remove Task.sleep from handleSendError and tests - handleSendError: remove fixed 500ms sleep before reconnection. connect() already handles backoff via ReconnectionConfiguration. Properly catch connect() errors instead of swallowing with try?. - Tests: remove Task.sleep-based assertions. Verify send() throws immediately; reconnection behavior is covered by existing tests. --- .../Base/Transports/NetworkTransport.swift | 10 ++-- Tests/MCPTests/NetworkTransportTests.swift | 46 ++----------------- 2 files changed, 11 insertions(+), 45 deletions(-) diff --git a/Sources/MCP/Base/Transports/NetworkTransport.swift b/Sources/MCP/Base/Transports/NetworkTransport.swift index 85f58d8a..ea55c66a 100644 --- a/Sources/MCP/Base/Transports/NetworkTransport.swift +++ b/Sources/MCP/Base/Transports/NetworkTransport.swift @@ -550,12 +550,14 @@ import Logging logger.warning("Connection appears broken, will attempt to reconnect...") setIsConnected(false) - - try? await Task.sleep(for: .milliseconds(500)) + connection.cancel() guard !isStopping else { return } - connection.cancel() - try? await connect() + do { + try await connect() + } catch { + logger.error("Reconnection failed: \(error)") + } } /// Receives data in an async sequence diff --git a/Tests/MCPTests/NetworkTransportTests.swift b/Tests/MCPTests/NetworkTransportTests.swift index 0cb0e595..93515971 100644 --- a/Tests/MCPTests/NetworkTransportTests.swift +++ b/Tests/MCPTests/NetworkTransportTests.swift @@ -702,20 +702,13 @@ import Testing await transport.disconnect() } - @Test("Send error resumes continuation and triggers reconnection") - func testSendErrorResumesAndReconnects() async throws { + @Test("Send error resumes continuation immediately") + func testSendErrorResumesContinuation() async throws { let mockConnection = MockNetworkConnection() - - let reconnectionConfig = NetworkTransport.ReconnectionConfiguration( - enabled: true, - maxAttempts: 1, - backoffMultiplier: 1.0 - ) - let transport = NetworkTransport( mockConnection, heartbeatConfig: .disabled, - reconnectionConfig: reconnectionConfig + reconnectionConfig: .disabled ) try await transport.connect() @@ -723,7 +716,8 @@ import Testing // Inject a connection-lost error for the next send mockConnection.setSendError(NWError.posix(POSIXErrorCode(rawValue: 57)!)) - // send() should throw immediately without hanging + // send() must throw without hanging — continuation is resumed directly + // in the NWConnection callback, not deferred via Task { @MainActor in } do { try await transport.send(#"{"test":"error"}"#.data(using: .utf8)!) Issue.record("Expected send to throw on error") @@ -731,36 +725,6 @@ import Testing #expect(error is MCPError) } - // Wait for handleSendError to run asynchronously - try await Task.sleep(for: .milliseconds(700)) - - await transport.disconnect() - } - - @Test("Send error without reconnection enabled does not reconnect") - func testSendErrorNoReconnect() async throws { - let mockConnection = MockNetworkConnection() - let transport = NetworkTransport( - mockConnection, - heartbeatConfig: .disabled, - reconnectionConfig: .disabled - ) - - try await transport.connect() - - mockConnection.setSendError(NWError.posix(POSIXErrorCode(rawValue: 57)!)) - - do { - try await transport.send(#"{"test":"no-reconnect"}"#.data(using: .utf8)!) - Issue.record("Expected send to throw on error") - } catch { - #expect(error is MCPError) - } - - // Connection should remain in ready state (no reconnection attempt) - try await Task.sleep(for: .milliseconds(100)) - #expect(mockConnection.state == .ready) - await transport.disconnect() }