From 2c87708cfd3129b341d7c43cb5a2c83c4982515b Mon Sep 17 00:00:00 2001 From: Felix Weinberger Date: Mon, 9 Feb 2026 13:47:28 +0000 Subject: [PATCH 1/5] feat: add idle timeout for StreamableHTTP sessions MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add a session_idle_timeout parameter to StreamableHTTPSessionManager that enables automatic cleanup of idle sessions, fixing the memory leak described in #1283. Uses a CancelScope with a deadline inside each session's run_server task. When the deadline passes, app.run() is cancelled and the session cleans up. Incoming requests push the deadline forward. No background tasks needed — each session manages its own lifecycle. - terminate() on the transport is now idempotent - Effective timeout accounts for retry_interval - Default is None (no timeout, fully backwards compatible) Github-Issue: #1283 --- src/mcp/server/streamable_http.py | 6 + src/mcp/server/streamable_http_manager.py | 61 +++- .../issues/test_1283_idle_session_cleanup.py | 286 ++++++++++++++++++ 3 files changed, 346 insertions(+), 7 deletions(-) create mode 100644 tests/issues/test_1283_idle_session_cleanup.py diff --git a/src/mcp/server/streamable_http.py b/src/mcp/server/streamable_http.py index 2613b530c..bd802a20a 100644 --- a/src/mcp/server/streamable_http.py +++ b/src/mcp/server/streamable_http.py @@ -180,6 +180,8 @@ def __init__( ] = {} self._sse_stream_writers: dict[RequestId, MemoryObjectSendStream[dict[str, str]]] = {} self._terminated = False + # Idle timeout cancel scope; managed by the session manager. + self.idle_scope: anyio.CancelScope | None = None @property def is_terminated(self) -> bool: @@ -773,8 +775,12 @@ async def terminate(self) -> None: """Terminate the current session, closing all streams. Once terminated, all requests with this session ID will receive 404 Not Found. + Calling this method multiple times is safe (idempotent). """ + if self._terminated: + return + self._terminated = True logger.info(f"Terminating session: {self.mcp_session_id}") diff --git a/src/mcp/server/streamable_http_manager.py b/src/mcp/server/streamable_http_manager.py index 6a1672417..89e14b356 100644 --- a/src/mcp/server/streamable_http_manager.py +++ b/src/mcp/server/streamable_http_manager.py @@ -38,6 +38,7 @@ class StreamableHTTPSessionManager: 2. Resumability via an optional event store 3. Connection management and lifecycle 4. Request handling and transport setup + 5. Idle session cleanup via optional timeout Important: Only one StreamableHTTPSessionManager instance should be created per application. The instance cannot be reused after its run() context has @@ -55,6 +56,15 @@ class StreamableHTTPSessionManager: security_settings: Optional transport security settings. retry_interval: Retry interval in milliseconds to suggest to clients in SSE retry field. Used for SSE polling behavior. + session_idle_timeout: Optional idle timeout in seconds for stateful sessions. + If set, sessions that receive no HTTP requests for this + duration will be automatically terminated and removed. + When retry_interval is also set, the effective idle + threshold is at least ``retry_interval / 1000 * 3`` to + avoid prematurely reaping sessions that are simply + waiting for SSE polling reconnections. Default is None + (no timeout). A value of 1800 (30 minutes) is + recommended for most deployments. """ def __init__( @@ -65,13 +75,20 @@ def __init__( stateless: bool = False, security_settings: TransportSecuritySettings | None = None, retry_interval: int | None = None, + session_idle_timeout: float | None = None, ): + if session_idle_timeout is not None and session_idle_timeout <= 0: + raise ValueError("session_idle_timeout must be a positive number of seconds") + if stateless and session_idle_timeout is not None: + raise ValueError("session_idle_timeout is not supported in stateless mode") + self.app = app self.event_store = event_store self.json_response = json_response self.stateless = stateless self.security_settings = security_settings self.retry_interval = retry_interval + self.session_idle_timeout = session_idle_timeout # Session tracking (only used if not stateless) self._session_creation_lock = anyio.Lock() @@ -114,6 +131,7 @@ async def lifespan(app: Starlette) -> AsyncIterator[None]: # Store the task group for later use self._task_group = tg logger.info("StreamableHTTP session manager started") + try: yield # Let the application run finally: @@ -219,6 +237,9 @@ async def _handle_stateful_request( if request_mcp_session_id is not None and request_mcp_session_id in self._server_instances: # pragma: no cover transport = self._server_instances[request_mcp_session_id] logger.debug("Session already exists, handling request directly") + # Push back idle deadline on activity + if transport.idle_scope is not None: + transport.idle_scope.deadline = anyio.current_time() + self._effective_idle_timeout() await transport.handle_request(scope, receive, send) return @@ -245,19 +266,36 @@ async def run_server(*, task_status: TaskStatus[None] = anyio.TASK_STATUS_IGNORE read_stream, write_stream = streams task_status.started() try: - await self.app.run( - read_stream, - write_stream, - self.app.create_initialization_options(), - stateless=False, # Stateful mode - ) + # Use a cancel scope for idle timeout — when the + # deadline passes the scope cancels app.run() and + # execution continues after the ``with`` block. + # Incoming requests push the deadline forward. + idle_scope = anyio.CancelScope() + if self.session_idle_timeout is not None: + timeout = self._effective_idle_timeout() + idle_scope.deadline = anyio.current_time() + timeout + http_transport.idle_scope = idle_scope + + with idle_scope: + await self.app.run( + read_stream, + write_stream, + self.app.create_initialization_options(), + stateless=False, + ) + + if idle_scope.cancelled_caught: + session_id = http_transport.mcp_session_id + logger.info(f"Session {session_id} idle timeout") + if session_id is not None: # pragma: no branch + self._server_instances.pop(session_id, None) + await http_transport.terminate() except Exception as e: logger.error( f"Session {http_transport.mcp_session_id} crashed: {e}", exc_info=True, ) finally: - # Only remove from instances if not terminated if ( # pragma: no branch http_transport.mcp_session_id and http_transport.mcp_session_id in self._server_instances @@ -295,3 +333,12 @@ async def run_server(*, task_status: TaskStatus[None] = anyio.TASK_STATUS_IGNORE media_type="application/json", ) await response(scope, receive, send) + + def _effective_idle_timeout(self) -> float: + """Compute the effective idle timeout, accounting for retry_interval.""" + assert self.session_idle_timeout is not None + timeout = self.session_idle_timeout + if self.retry_interval is not None: + retry_seconds = self.retry_interval / 1000.0 + timeout = max(timeout, retry_seconds * 3) + return timeout diff --git a/tests/issues/test_1283_idle_session_cleanup.py b/tests/issues/test_1283_idle_session_cleanup.py new file mode 100644 index 000000000..42638ee22 --- /dev/null +++ b/tests/issues/test_1283_idle_session_cleanup.py @@ -0,0 +1,286 @@ +"""Test for issue #1283 - Memory leak from idle sessions never being cleaned up. + +Without an idle timeout mechanism, sessions created via StreamableHTTPSessionManager +persist indefinitely in ``_server_instances`` even after the client disconnects. +Over time this leaks memory. + +The ``session_idle_timeout`` parameter on ``StreamableHTTPSessionManager`` allows +the manager to automatically terminate and remove sessions that have been idle for +longer than the configured duration. +""" + +import time +from collections.abc import Callable, Coroutine +from typing import Any + +import anyio +import pytest +from starlette.types import Message, Scope + +from mcp.server.lowlevel import Server +from mcp.server.streamable_http import MCP_SESSION_ID_HEADER, StreamableHTTPServerTransport +from mcp.server.streamable_http_manager import StreamableHTTPSessionManager + + +def _make_scope() -> Scope: + return { + "type": "http", + "method": "POST", + "path": "/mcp", + "headers": [(b"content-type", b"application/json")], + } + + +async def _mock_receive() -> Message: # pragma: no cover + return {"type": "http.request", "body": b"", "more_body": False} + + +def _make_send(sent: list[Message]) -> Callable[[Message], Coroutine[Any, Any, None]]: + async def mock_send(message: Message) -> None: + sent.append(message) + + return mock_send + + +def _extract_session_id(sent_messages: list[Message]) -> str: + for msg in sent_messages: + if msg["type"] == "http.response.start": # pragma: no branch + for name, value in msg.get("headers", []): # pragma: no branch + if name.decode().lower() == MCP_SESSION_ID_HEADER.lower(): # pragma: no branch + return value.decode() + raise AssertionError("Session ID not found in response headers") # pragma: no cover + + +@pytest.mark.anyio +async def test_idle_session_is_reaped(): + """Session should be removed from _server_instances after idle timeout.""" + app = Server("test-idle-reap") + manager = StreamableHTTPSessionManager(app=app, session_idle_timeout=0.15) + + async with manager.run(): + sent: list[Message] = [] + await manager.handle_request(_make_scope(), _mock_receive, _make_send(sent)) + session_id = _extract_session_id(sent) + + assert session_id in manager._server_instances + + # Wait for the cancel scope deadline to fire + await anyio.sleep(0.4) + + assert session_id not in manager._server_instances + + +@pytest.mark.anyio +async def test_activity_resets_idle_timer(): + """Requests during the timeout window should prevent the session from being reaped.""" + app = Server("test-idle-reset") + manager = StreamableHTTPSessionManager(app=app, session_idle_timeout=0.3) + + async with manager.run(): + sent: list[Message] = [] + await manager.handle_request(_make_scope(), _mock_receive, _make_send(sent)) + session_id = _extract_session_id(sent) + + # Simulate ongoing activity by pushing back the idle scope deadline + transport = manager._server_instances[session_id] + assert transport.idle_scope is not None + for _ in range(4): + await anyio.sleep(0.1) + transport.idle_scope.deadline = anyio.current_time() + 0.3 + + # Session should still be alive because we kept it active + assert session_id in manager._server_instances + + # Now stop activity and let the timeout expire + await anyio.sleep(0.6) + + assert session_id not in manager._server_instances + + +@pytest.mark.anyio +async def test_multiple_sessions_reaped_independently(): + """Each session tracks its own idle timeout independently.""" + app = Server("test-multi-idle") + manager = StreamableHTTPSessionManager(app=app, session_idle_timeout=0.15) + + async with manager.run(): + sent1: list[Message] = [] + await manager.handle_request(_make_scope(), _mock_receive, _make_send(sent1)) + session_id_1 = _extract_session_id(sent1) + + await anyio.sleep(0.05) + sent2: list[Message] = [] + await manager.handle_request(_make_scope(), _mock_receive, _make_send(sent2)) + session_id_2 = _extract_session_id(sent2) + + assert session_id_1 in manager._server_instances + assert session_id_2 in manager._server_instances + + # After enough time, both should be reaped + await anyio.sleep(0.4) + + assert session_id_1 not in manager._server_instances + assert session_id_2 not in manager._server_instances + + +def test_session_idle_timeout_rejects_negative(): + """session_idle_timeout must be a positive number.""" + with pytest.raises(ValueError, match="positive number"): + StreamableHTTPSessionManager(app=Server("test"), session_idle_timeout=-1) + + +def test_session_idle_timeout_rejects_zero(): + """session_idle_timeout must be a positive number.""" + with pytest.raises(ValueError, match="positive number"): + StreamableHTTPSessionManager(app=Server("test"), session_idle_timeout=0) + + +def test_session_idle_timeout_rejects_stateless(): + """session_idle_timeout is not supported in stateless mode.""" + with pytest.raises(ValueError, match="not supported in stateless"): + StreamableHTTPSessionManager(app=Server("test"), session_idle_timeout=30, stateless=True) + + +@pytest.mark.anyio +async def test_terminate_idempotency(): + """Calling terminate() multiple times should be safe.""" + transport = StreamableHTTPServerTransport(mcp_session_id="test-idempotent") + + async with transport.connect(): + await transport.terminate() + assert transport.is_terminated + + # Second call should be a no-op (no exception) + await transport.terminate() + assert transport.is_terminated + + +@pytest.mark.anyio +async def test_idle_timeout_with_retry_interval(): + """When retry_interval is set, effective timeout should account for polling gaps.""" + app = Server("test-retry-interval") + + # retry_interval = 5000ms = 5s -> retry_seconds * 3 = 15s + # session_idle_timeout = 1s -> effective = max(1, 15) = 15 + manager = StreamableHTTPSessionManager(app=app, session_idle_timeout=1.0, retry_interval=5000) + assert manager._effective_idle_timeout() == 15.0 + + # When retry_interval is small, session_idle_timeout should dominate + manager2 = StreamableHTTPSessionManager(app=app, session_idle_timeout=10.0, retry_interval=100) + assert manager2._effective_idle_timeout() == 10.0 + + # No retry_interval -> raw timeout + manager3 = StreamableHTTPSessionManager(app=app, session_idle_timeout=5.0) + assert manager3._effective_idle_timeout() == 5.0 + + +@pytest.mark.anyio +async def test_no_idle_timeout_sessions_persist(): + """When session_idle_timeout is None (default), sessions persist indefinitely.""" + app = Server("test-no-timeout") + manager = StreamableHTTPSessionManager(app=app) + + async with manager.run(): + sent: list[Message] = [] + await manager.handle_request(_make_scope(), _mock_receive, _make_send(sent)) + session_id = _extract_session_id(sent) + + await anyio.sleep(0.3) + assert session_id in manager._server_instances + + +@pytest.mark.anyio +async def test_run_server_exits_promptly_after_idle_timeout(): + """The run_server task must exit shortly after the idle timeout fires.""" + app = Server("test-lifecycle") + + task_exited = anyio.Event() + exit_timestamp: list[float] = [] + original_run = app.run + + async def instrumented_run(*args: Any, **kwargs: Any) -> None: + try: + await original_run(*args, **kwargs) + finally: + exit_timestamp.append(time.monotonic()) + task_exited.set() + + app.run = instrumented_run # type: ignore[assignment] + + idle_timeout = 0.5 + manager = StreamableHTTPSessionManager(app=app, session_idle_timeout=idle_timeout) + + async with manager.run(): + sent: list[Message] = [] + await manager.handle_request(_make_scope(), _mock_receive, _make_send(sent)) + session_id = _extract_session_id(sent) + assert session_id in manager._server_instances + + pre_reap_time = time.monotonic() + + with anyio.fail_after(idle_timeout * 4): + await task_exited.wait() + + assert len(exit_timestamp) == 1 + total_elapsed = exit_timestamp[0] - pre_reap_time + assert total_elapsed < idle_timeout * 3, ( + f"run_server task took {total_elapsed:.3f}s to exit; expected < {idle_timeout * 3:.1f}s" + ) + assert session_id not in manager._server_instances + + +@pytest.mark.anyio +async def test_run_server_finally_block_runs_after_terminate(): + """Verify that the finally block in run_server executes after terminate().""" + app = Server("test-finally") + + lifecycle_events: list[str] = [] + original_run = app.run + + async def instrumented_run(*args: Any, **kwargs: Any) -> None: + lifecycle_events.append("run_entered") + try: + await original_run(*args, **kwargs) + finally: + lifecycle_events.append("run_exited") + + app.run = instrumented_run # type: ignore[assignment] + + manager = StreamableHTTPSessionManager(app=app) + + async with manager.run(): + sent: list[Message] = [] + await manager.handle_request(_make_scope(), _mock_receive, _make_send(sent)) + session_id = _extract_session_id(sent) + transport = manager._server_instances[session_id] + + assert "run_entered" in lifecycle_events + assert "run_exited" not in lifecycle_events + + await transport.terminate() + + with anyio.fail_after(3.0): + while "run_exited" not in lifecycle_events: + await anyio.sleep(0.01) + + assert "run_exited" in lifecycle_events + + +@pytest.mark.anyio +async def test_idle_timeout_end_to_end(): + """End-to-end: idle timeout causes session cleanup with a real Server.""" + app = Server("test-e2e") + idle_timeout = 0.3 + manager = StreamableHTTPSessionManager(app=app, session_idle_timeout=idle_timeout) + + async with manager.run(): + sent: list[Message] = [] + await manager.handle_request(_make_scope(), _mock_receive, _make_send(sent)) + session_id = _extract_session_id(sent) + assert session_id in manager._server_instances + + with anyio.fail_after(idle_timeout + 1.0): + while session_id in manager._server_instances: + await anyio.sleep(0.05) + + assert session_id not in manager._server_instances From 7345bd7161bf9176952a43b7ef6aa69dd03ec4f1 Mon Sep 17 00:00:00 2001 From: Felix Weinberger Date: Mon, 9 Feb 2026 14:13:50 +0000 Subject: [PATCH 2/5] Clean up idle timeout implementation - Remove unrelated blank line between logger.info and try block - Only create CancelScope when session_idle_timeout is set, keeping the no-timeout path identical to the original code - Add docstring to _effective_idle_timeout explaining the 3x retry_interval multiplier rationale --- src/mcp/server/streamable_http_manager.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/src/mcp/server/streamable_http_manager.py b/src/mcp/server/streamable_http_manager.py index 89e14b356..bbd967bd4 100644 --- a/src/mcp/server/streamable_http_manager.py +++ b/src/mcp/server/streamable_http_manager.py @@ -131,7 +131,6 @@ async def lifespan(app: Starlette) -> AsyncIterator[None]: # Store the task group for later use self._task_group = tg logger.info("StreamableHTTP session manager started") - try: yield # Let the application run finally: @@ -335,7 +334,14 @@ async def run_server(*, task_status: TaskStatus[None] = anyio.TASK_STATUS_IGNORE await response(scope, receive, send) def _effective_idle_timeout(self) -> float: - """Compute the effective idle timeout, accounting for retry_interval.""" + """Compute the effective idle timeout, accounting for retry_interval. + + When SSE retry_interval is configured, clients periodically reconnect + to resume the event stream. A gap of up to ``retry_interval`` between + connections is normal, not a sign of idleness. We use a 3x multiplier + to tolerate up to two consecutive missed polls (network jitter, slow + client) before considering the session idle. + """ assert self.session_idle_timeout is not None timeout = self.session_idle_timeout if self.retry_interval is not None: From 2c9e60cc9a2105bf8198337c23e1f52a1ca50a5f Mon Sep 17 00:00:00 2001 From: Felix Weinberger Date: Mon, 9 Feb 2026 14:37:42 +0000 Subject: [PATCH 3/5] Remove _effective_idle_timeout, use session_idle_timeout directly The helper silently clamped the user's configured timeout to retry_interval * 3, which is surprising. Users should set a timeout that suits their deployment. Updated the docstring to note that the timeout should comfortably exceed retry_interval when both are set. --- src/mcp/server/streamable_http_manager.py | 34 +++++-------------- .../issues/test_1283_idle_session_cleanup.py | 19 ----------- 2 files changed, 8 insertions(+), 45 deletions(-) diff --git a/src/mcp/server/streamable_http_manager.py b/src/mcp/server/streamable_http_manager.py index bbd967bd4..12156df59 100644 --- a/src/mcp/server/streamable_http_manager.py +++ b/src/mcp/server/streamable_http_manager.py @@ -59,12 +59,11 @@ class StreamableHTTPSessionManager: session_idle_timeout: Optional idle timeout in seconds for stateful sessions. If set, sessions that receive no HTTP requests for this duration will be automatically terminated and removed. - When retry_interval is also set, the effective idle - threshold is at least ``retry_interval / 1000 * 3`` to - avoid prematurely reaping sessions that are simply - waiting for SSE polling reconnections. Default is None - (no timeout). A value of 1800 (30 minutes) is - recommended for most deployments. + When retry_interval is also configured, ensure the idle + timeout comfortably exceeds the retry interval to avoid + reaping sessions during normal SSE polling gaps. + Default is None (no timeout). A value of 1800 + (30 minutes) is recommended for most deployments. """ def __init__( @@ -237,8 +236,8 @@ async def _handle_stateful_request( transport = self._server_instances[request_mcp_session_id] logger.debug("Session already exists, handling request directly") # Push back idle deadline on activity - if transport.idle_scope is not None: - transport.idle_scope.deadline = anyio.current_time() + self._effective_idle_timeout() + if transport.idle_scope is not None and self.session_idle_timeout is not None: + transport.idle_scope.deadline = anyio.current_time() + self.session_idle_timeout await transport.handle_request(scope, receive, send) return @@ -271,8 +270,7 @@ async def run_server(*, task_status: TaskStatus[None] = anyio.TASK_STATUS_IGNORE # Incoming requests push the deadline forward. idle_scope = anyio.CancelScope() if self.session_idle_timeout is not None: - timeout = self._effective_idle_timeout() - idle_scope.deadline = anyio.current_time() + timeout + idle_scope.deadline = anyio.current_time() + self.session_idle_timeout http_transport.idle_scope = idle_scope with idle_scope: @@ -332,19 +330,3 @@ async def run_server(*, task_status: TaskStatus[None] = anyio.TASK_STATUS_IGNORE media_type="application/json", ) await response(scope, receive, send) - - def _effective_idle_timeout(self) -> float: - """Compute the effective idle timeout, accounting for retry_interval. - - When SSE retry_interval is configured, clients periodically reconnect - to resume the event stream. A gap of up to ``retry_interval`` between - connections is normal, not a sign of idleness. We use a 3x multiplier - to tolerate up to two consecutive missed polls (network jitter, slow - client) before considering the session idle. - """ - assert self.session_idle_timeout is not None - timeout = self.session_idle_timeout - if self.retry_interval is not None: - retry_seconds = self.retry_interval / 1000.0 - timeout = max(timeout, retry_seconds * 3) - return timeout diff --git a/tests/issues/test_1283_idle_session_cleanup.py b/tests/issues/test_1283_idle_session_cleanup.py index 42638ee22..9a9193786 100644 --- a/tests/issues/test_1283_idle_session_cleanup.py +++ b/tests/issues/test_1283_idle_session_cleanup.py @@ -155,25 +155,6 @@ async def test_terminate_idempotency(): assert transport.is_terminated -@pytest.mark.anyio -async def test_idle_timeout_with_retry_interval(): - """When retry_interval is set, effective timeout should account for polling gaps.""" - app = Server("test-retry-interval") - - # retry_interval = 5000ms = 5s -> retry_seconds * 3 = 15s - # session_idle_timeout = 1s -> effective = max(1, 15) = 15 - manager = StreamableHTTPSessionManager(app=app, session_idle_timeout=1.0, retry_interval=5000) - assert manager._effective_idle_timeout() == 15.0 - - # When retry_interval is small, session_idle_timeout should dominate - manager2 = StreamableHTTPSessionManager(app=app, session_idle_timeout=10.0, retry_interval=100) - assert manager2._effective_idle_timeout() == 10.0 - - # No retry_interval -> raw timeout - manager3 = StreamableHTTPSessionManager(app=app, session_idle_timeout=5.0) - assert manager3._effective_idle_timeout() == 5.0 - - @pytest.mark.anyio async def test_no_idle_timeout_sessions_persist(): """When session_idle_timeout is None (default), sessions persist indefinitely.""" From d5686df2b42df4faea1a2a31e80575667c9fba3c Mon Sep 17 00:00:00 2001 From: Felix Weinberger Date: Mon, 16 Feb 2026 11:54:31 +0000 Subject: [PATCH 4/5] Address review feedback: docstring formatting, test reorganization - Reformat session_idle_timeout docstring to use 4-space continuation indent and fill to 120 chars - Move idle timeout tests from tests/issues/ into tests/server/test_streamable_http_manager.py alongside existing session manager tests - Remove redundant/unnecessary tests (6 dropped, 5 kept) - Replace sleep-based test assertions with event-based polling using anyio.fail_after for deterministic behavior Github-Issue: #1283 --- CLAUDE.md | 4 + src/mcp/server/streamable_http_manager.py | 17 +- .../issues/test_1283_idle_session_cleanup.py | 267 ------------------ tests/server/test_streamable_http_manager.py | 71 +++++ 4 files changed, 84 insertions(+), 275 deletions(-) delete mode 100644 tests/issues/test_1283_idle_session_cleanup.py diff --git a/CLAUDE.md b/CLAUDE.md index cc2d36060..25b14383f 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -24,6 +24,10 @@ This document contains critical information about working with this codebase. Fo - Coverage: test edge cases and errors - New features require tests - Bug fixes require regression tests + - NEVER use `anyio.sleep()` with a fixed duration as a synchronization mechanism. Instead: + - Use `anyio.Event` — set it in the callback/handler, `await event.wait()` in the test + - For stream messages, use `await stream.receive()` instead of `sleep()` + `receive_nowait()` + - Wrap waits in `anyio.fail_after(5)` as a timeout guard - For commits fixing bugs or adding features based on user reports add: diff --git a/src/mcp/server/streamable_http_manager.py b/src/mcp/server/streamable_http_manager.py index 12156df59..5a9d3993c 100644 --- a/src/mcp/server/streamable_http_manager.py +++ b/src/mcp/server/streamable_http_manager.py @@ -56,14 +56,15 @@ class StreamableHTTPSessionManager: security_settings: Optional transport security settings. retry_interval: Retry interval in milliseconds to suggest to clients in SSE retry field. Used for SSE polling behavior. - session_idle_timeout: Optional idle timeout in seconds for stateful sessions. - If set, sessions that receive no HTTP requests for this - duration will be automatically terminated and removed. - When retry_interval is also configured, ensure the idle - timeout comfortably exceeds the retry interval to avoid - reaping sessions during normal SSE polling gaps. - Default is None (no timeout). A value of 1800 - (30 minutes) is recommended for most deployments. + session_idle_timeout: Optional idle timeout in seconds for stateful + sessions. If set, sessions that receive no HTTP + requests for this duration will be automatically + terminated and removed. When retry_interval is + also configured, ensure the idle timeout + comfortably exceeds the retry interval to avoid + reaping sessions during normal SSE polling gaps. + Default is None (no timeout). A value of 1800 + (30 minutes) is recommended for most deployments. """ def __init__( diff --git a/tests/issues/test_1283_idle_session_cleanup.py b/tests/issues/test_1283_idle_session_cleanup.py deleted file mode 100644 index 9a9193786..000000000 --- a/tests/issues/test_1283_idle_session_cleanup.py +++ /dev/null @@ -1,267 +0,0 @@ -"""Test for issue #1283 - Memory leak from idle sessions never being cleaned up. - -Without an idle timeout mechanism, sessions created via StreamableHTTPSessionManager -persist indefinitely in ``_server_instances`` even after the client disconnects. -Over time this leaks memory. - -The ``session_idle_timeout`` parameter on ``StreamableHTTPSessionManager`` allows -the manager to automatically terminate and remove sessions that have been idle for -longer than the configured duration. -""" - -import time -from collections.abc import Callable, Coroutine -from typing import Any - -import anyio -import pytest -from starlette.types import Message, Scope - -from mcp.server.lowlevel import Server -from mcp.server.streamable_http import MCP_SESSION_ID_HEADER, StreamableHTTPServerTransport -from mcp.server.streamable_http_manager import StreamableHTTPSessionManager - - -def _make_scope() -> Scope: - return { - "type": "http", - "method": "POST", - "path": "/mcp", - "headers": [(b"content-type", b"application/json")], - } - - -async def _mock_receive() -> Message: # pragma: no cover - return {"type": "http.request", "body": b"", "more_body": False} - - -def _make_send(sent: list[Message]) -> Callable[[Message], Coroutine[Any, Any, None]]: - async def mock_send(message: Message) -> None: - sent.append(message) - - return mock_send - - -def _extract_session_id(sent_messages: list[Message]) -> str: - for msg in sent_messages: - if msg["type"] == "http.response.start": # pragma: no branch - for name, value in msg.get("headers", []): # pragma: no branch - if name.decode().lower() == MCP_SESSION_ID_HEADER.lower(): # pragma: no branch - return value.decode() - raise AssertionError("Session ID not found in response headers") # pragma: no cover - - -@pytest.mark.anyio -async def test_idle_session_is_reaped(): - """Session should be removed from _server_instances after idle timeout.""" - app = Server("test-idle-reap") - manager = StreamableHTTPSessionManager(app=app, session_idle_timeout=0.15) - - async with manager.run(): - sent: list[Message] = [] - await manager.handle_request(_make_scope(), _mock_receive, _make_send(sent)) - session_id = _extract_session_id(sent) - - assert session_id in manager._server_instances - - # Wait for the cancel scope deadline to fire - await anyio.sleep(0.4) - - assert session_id not in manager._server_instances - - -@pytest.mark.anyio -async def test_activity_resets_idle_timer(): - """Requests during the timeout window should prevent the session from being reaped.""" - app = Server("test-idle-reset") - manager = StreamableHTTPSessionManager(app=app, session_idle_timeout=0.3) - - async with manager.run(): - sent: list[Message] = [] - await manager.handle_request(_make_scope(), _mock_receive, _make_send(sent)) - session_id = _extract_session_id(sent) - - # Simulate ongoing activity by pushing back the idle scope deadline - transport = manager._server_instances[session_id] - assert transport.idle_scope is not None - for _ in range(4): - await anyio.sleep(0.1) - transport.idle_scope.deadline = anyio.current_time() + 0.3 - - # Session should still be alive because we kept it active - assert session_id in manager._server_instances - - # Now stop activity and let the timeout expire - await anyio.sleep(0.6) - - assert session_id not in manager._server_instances - - -@pytest.mark.anyio -async def test_multiple_sessions_reaped_independently(): - """Each session tracks its own idle timeout independently.""" - app = Server("test-multi-idle") - manager = StreamableHTTPSessionManager(app=app, session_idle_timeout=0.15) - - async with manager.run(): - sent1: list[Message] = [] - await manager.handle_request(_make_scope(), _mock_receive, _make_send(sent1)) - session_id_1 = _extract_session_id(sent1) - - await anyio.sleep(0.05) - sent2: list[Message] = [] - await manager.handle_request(_make_scope(), _mock_receive, _make_send(sent2)) - session_id_2 = _extract_session_id(sent2) - - assert session_id_1 in manager._server_instances - assert session_id_2 in manager._server_instances - - # After enough time, both should be reaped - await anyio.sleep(0.4) - - assert session_id_1 not in manager._server_instances - assert session_id_2 not in manager._server_instances - - -def test_session_idle_timeout_rejects_negative(): - """session_idle_timeout must be a positive number.""" - with pytest.raises(ValueError, match="positive number"): - StreamableHTTPSessionManager(app=Server("test"), session_idle_timeout=-1) - - -def test_session_idle_timeout_rejects_zero(): - """session_idle_timeout must be a positive number.""" - with pytest.raises(ValueError, match="positive number"): - StreamableHTTPSessionManager(app=Server("test"), session_idle_timeout=0) - - -def test_session_idle_timeout_rejects_stateless(): - """session_idle_timeout is not supported in stateless mode.""" - with pytest.raises(ValueError, match="not supported in stateless"): - StreamableHTTPSessionManager(app=Server("test"), session_idle_timeout=30, stateless=True) - - -@pytest.mark.anyio -async def test_terminate_idempotency(): - """Calling terminate() multiple times should be safe.""" - transport = StreamableHTTPServerTransport(mcp_session_id="test-idempotent") - - async with transport.connect(): - await transport.terminate() - assert transport.is_terminated - - # Second call should be a no-op (no exception) - await transport.terminate() - assert transport.is_terminated - - -@pytest.mark.anyio -async def test_no_idle_timeout_sessions_persist(): - """When session_idle_timeout is None (default), sessions persist indefinitely.""" - app = Server("test-no-timeout") - manager = StreamableHTTPSessionManager(app=app) - - async with manager.run(): - sent: list[Message] = [] - await manager.handle_request(_make_scope(), _mock_receive, _make_send(sent)) - session_id = _extract_session_id(sent) - - await anyio.sleep(0.3) - assert session_id in manager._server_instances - - -@pytest.mark.anyio -async def test_run_server_exits_promptly_after_idle_timeout(): - """The run_server task must exit shortly after the idle timeout fires.""" - app = Server("test-lifecycle") - - task_exited = anyio.Event() - exit_timestamp: list[float] = [] - original_run = app.run - - async def instrumented_run(*args: Any, **kwargs: Any) -> None: - try: - await original_run(*args, **kwargs) - finally: - exit_timestamp.append(time.monotonic()) - task_exited.set() - - app.run = instrumented_run # type: ignore[assignment] - - idle_timeout = 0.5 - manager = StreamableHTTPSessionManager(app=app, session_idle_timeout=idle_timeout) - - async with manager.run(): - sent: list[Message] = [] - await manager.handle_request(_make_scope(), _mock_receive, _make_send(sent)) - session_id = _extract_session_id(sent) - assert session_id in manager._server_instances - - pre_reap_time = time.monotonic() - - with anyio.fail_after(idle_timeout * 4): - await task_exited.wait() - - assert len(exit_timestamp) == 1 - total_elapsed = exit_timestamp[0] - pre_reap_time - assert total_elapsed < idle_timeout * 3, ( - f"run_server task took {total_elapsed:.3f}s to exit; expected < {idle_timeout * 3:.1f}s" - ) - assert session_id not in manager._server_instances - - -@pytest.mark.anyio -async def test_run_server_finally_block_runs_after_terminate(): - """Verify that the finally block in run_server executes after terminate().""" - app = Server("test-finally") - - lifecycle_events: list[str] = [] - original_run = app.run - - async def instrumented_run(*args: Any, **kwargs: Any) -> None: - lifecycle_events.append("run_entered") - try: - await original_run(*args, **kwargs) - finally: - lifecycle_events.append("run_exited") - - app.run = instrumented_run # type: ignore[assignment] - - manager = StreamableHTTPSessionManager(app=app) - - async with manager.run(): - sent: list[Message] = [] - await manager.handle_request(_make_scope(), _mock_receive, _make_send(sent)) - session_id = _extract_session_id(sent) - transport = manager._server_instances[session_id] - - assert "run_entered" in lifecycle_events - assert "run_exited" not in lifecycle_events - - await transport.terminate() - - with anyio.fail_after(3.0): - while "run_exited" not in lifecycle_events: - await anyio.sleep(0.01) - - assert "run_exited" in lifecycle_events - - -@pytest.mark.anyio -async def test_idle_timeout_end_to_end(): - """End-to-end: idle timeout causes session cleanup with a real Server.""" - app = Server("test-e2e") - idle_timeout = 0.3 - manager = StreamableHTTPSessionManager(app=app, session_idle_timeout=idle_timeout) - - async with manager.run(): - sent: list[Message] = [] - await manager.handle_request(_make_scope(), _mock_receive, _make_send(sent)) - session_id = _extract_session_id(sent) - assert session_id in manager._server_instances - - with anyio.fail_after(idle_timeout + 1.0): - while session_id in manager._server_instances: - await anyio.sleep(0.05) - - assert session_id not in manager._server_instances diff --git a/tests/server/test_streamable_http_manager.py b/tests/server/test_streamable_http_manager.py index af1b23619..4d65a68d0 100644 --- a/tests/server/test_streamable_http_manager.py +++ b/tests/server/test_streamable_http_manager.py @@ -313,3 +313,74 @@ async def mock_receive(): assert error_data["id"] == "server-error" assert error_data["error"]["code"] == INVALID_REQUEST assert error_data["error"]["message"] == "Session not found" + + +@pytest.mark.anyio +async def test_idle_session_is_reaped(): + """Idle timeout sets a cancel scope deadline and reaps the session when it fires.""" + idle_timeout = 300 + app = Server("test-idle-reap") + manager = StreamableHTTPSessionManager(app=app, session_idle_timeout=idle_timeout) + + async with manager.run(): + sent_messages: list[Message] = [] + + async def mock_send(message: Message): + sent_messages.append(message) + + scope = { + "type": "http", + "method": "POST", + "path": "/mcp", + "headers": [(b"content-type", b"application/json")], + } + + async def mock_receive(): # pragma: no cover + return {"type": "http.request", "body": b"", "more_body": False} + + before = anyio.current_time() + await manager.handle_request(scope, mock_receive, mock_send) + + session_id = None + for msg in sent_messages: # pragma: no branch + if msg["type"] == "http.response.start": # pragma: no branch + for header_name, header_value in msg.get("headers", []): # pragma: no branch + if header_name.decode().lower() == MCP_SESSION_ID_HEADER.lower(): + session_id = header_value.decode() + break + if session_id: # pragma: no branch + break + + assert session_id is not None, "Session ID not found in response headers" + assert session_id in manager._server_instances + + # Verify the idle deadline was set correctly + transport = manager._server_instances[session_id] + assert transport.idle_scope is not None + assert transport.idle_scope.deadline >= before + idle_timeout + + # Simulate time passing by expiring the deadline + transport.idle_scope.deadline = anyio.current_time() + + with anyio.fail_after(5): + while session_id in manager._server_instances: + await anyio.sleep(0) + + assert session_id not in manager._server_instances + + # Verify terminate() is idempotent + await transport.terminate() + assert transport.is_terminated + + +@pytest.mark.parametrize( + "kwargs,match", + [ + ({"session_idle_timeout": -1}, "positive number"), + ({"session_idle_timeout": 0}, "positive number"), + ({"session_idle_timeout": 30, "stateless": True}, "not supported in stateless"), + ], +) +def test_session_idle_timeout_validation(kwargs: dict[str, Any], match: str): + with pytest.raises(ValueError, match=match): + StreamableHTTPSessionManager(app=Server("test"), **kwargs) From 4191aa65db3b5df2bce22509cd0e51da42b5972f Mon Sep 17 00:00:00 2001 From: Felix Weinberger Date: Mon, 16 Feb 2026 13:18:39 +0000 Subject: [PATCH 5/5] Address latest review feedback - Fix docstring indentation: use 4-space continuation indent filled to 120 cols consistently across all Args parameters - Change stateless+idle_timeout error from ValueError to RuntimeError - Use logger.exception() instead of logger.error() with exc_info - Remove unnecessary None guard on session_id in idle timeout cleanup - Replace while+sleep(0) polling with anyio.Event in test Github-Issue: #1283 --- CLAUDE.md | 5 +- src/mcp/server/streamable_http.py | 2 +- src/mcp/server/streamable_http_manager.py | 45 +++++------ tests/server/test_streamable_http_manager.py | 78 +++++++++++--------- 4 files changed, 64 insertions(+), 66 deletions(-) diff --git a/CLAUDE.md b/CLAUDE.md index 25b14383f..986e64d55 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -24,10 +24,11 @@ This document contains critical information about working with this codebase. Fo - Coverage: test edge cases and errors - New features require tests - Bug fixes require regression tests - - NEVER use `anyio.sleep()` with a fixed duration as a synchronization mechanism. Instead: + - Avoid `anyio.sleep()` with a fixed duration to wait for async operations. Instead: - Use `anyio.Event` — set it in the callback/handler, `await event.wait()` in the test - For stream messages, use `await stream.receive()` instead of `sleep()` + `receive_nowait()` - - Wrap waits in `anyio.fail_after(5)` as a timeout guard + - Exception: `sleep()` is appropriate when testing time-based features (e.g., timeouts) + - Wrap indefinite waits (`event.wait()`, `stream.receive()`) in `anyio.fail_after(5)` to prevent hangs - For commits fixing bugs or adding features based on user reports add: diff --git a/src/mcp/server/streamable_http.py b/src/mcp/server/streamable_http.py index bd802a20a..c241e831a 100644 --- a/src/mcp/server/streamable_http.py +++ b/src/mcp/server/streamable_http.py @@ -778,7 +778,7 @@ async def terminate(self) -> None: Calling this method multiple times is safe (idempotent). """ - if self._terminated: + if self._terminated: # pragma: no cover return self._terminated = True diff --git a/src/mcp/server/streamable_http_manager.py b/src/mcp/server/streamable_http_manager.py index 5a9d3993c..8a7b765e8 100644 --- a/src/mcp/server/streamable_http_manager.py +++ b/src/mcp/server/streamable_http_manager.py @@ -46,25 +46,20 @@ class StreamableHTTPSessionManager: Args: app: The MCP server instance - event_store: Optional event store for resumability support. - If provided, enables resumable connections where clients - can reconnect and receive missed events. - If None, sessions are still tracked but not resumable. + event_store: Optional event store for resumability support. If provided, enables resumable connections + where clients can reconnect and receive missed events. If None, sessions are still tracked but not + resumable. json_response: Whether to use JSON responses instead of SSE streams - stateless: If True, creates a completely fresh transport for each request - with no session tracking or state persistence between requests. + stateless: If True, creates a completely fresh transport for each request with no session tracking or + state persistence between requests. security_settings: Optional transport security settings. - retry_interval: Retry interval in milliseconds to suggest to clients in SSE - retry field. Used for SSE polling behavior. - session_idle_timeout: Optional idle timeout in seconds for stateful - sessions. If set, sessions that receive no HTTP - requests for this duration will be automatically - terminated and removed. When retry_interval is - also configured, ensure the idle timeout - comfortably exceeds the retry interval to avoid - reaping sessions during normal SSE polling gaps. - Default is None (no timeout). A value of 1800 - (30 minutes) is recommended for most deployments. + retry_interval: Retry interval in milliseconds to suggest to clients in SSE retry field. Used for SSE + polling behavior. + session_idle_timeout: Optional idle timeout in seconds for stateful sessions. If set, sessions that + receive no HTTP requests for this duration will be automatically terminated and removed. When + retry_interval is also configured, ensure the idle timeout comfortably exceeds the retry interval to + avoid reaping sessions during normal SSE polling gaps. Default is None (no timeout). A value of 1800 + (30 minutes) is recommended for most deployments. """ def __init__( @@ -80,7 +75,7 @@ def __init__( if session_idle_timeout is not None and session_idle_timeout <= 0: raise ValueError("session_idle_timeout must be a positive number of seconds") if stateless and session_idle_timeout is not None: - raise ValueError("session_idle_timeout is not supported in stateless mode") + raise RuntimeError("session_idle_timeout is not supported in stateless mode") self.app = app self.event_store = event_store @@ -283,16 +278,12 @@ async def run_server(*, task_status: TaskStatus[None] = anyio.TASK_STATUS_IGNORE ) if idle_scope.cancelled_caught: - session_id = http_transport.mcp_session_id - logger.info(f"Session {session_id} idle timeout") - if session_id is not None: # pragma: no branch - self._server_instances.pop(session_id, None) + assert http_transport.mcp_session_id is not None + logger.info(f"Session {http_transport.mcp_session_id} idle timeout") + self._server_instances.pop(http_transport.mcp_session_id, None) await http_transport.terminate() - except Exception as e: - logger.error( - f"Session {http_transport.mcp_session_id} crashed: {e}", - exc_info=True, - ) + except Exception: + logger.exception(f"Session {http_transport.mcp_session_id} crashed") finally: if ( # pragma: no branch http_transport.mcp_session_id diff --git a/tests/server/test_streamable_http_manager.py b/tests/server/test_streamable_http_manager.py index 4d65a68d0..33bcb5f2a 100644 --- a/tests/server/test_streamable_http_manager.py +++ b/tests/server/test_streamable_http_manager.py @@ -317,10 +317,9 @@ async def mock_receive(): @pytest.mark.anyio async def test_idle_session_is_reaped(): - """Idle timeout sets a cancel scope deadline and reaps the session when it fires.""" - idle_timeout = 300 + """After idle timeout fires, the session returns 404.""" app = Server("test-idle-reap") - manager = StreamableHTTPSessionManager(app=app, session_idle_timeout=idle_timeout) + manager = StreamableHTTPSessionManager(app=app, session_idle_timeout=0.05) async with manager.run(): sent_messages: list[Message] = [] @@ -338,7 +337,6 @@ async def mock_send(message: Message): async def mock_receive(): # pragma: no cover return {"type": "http.request", "body": b"", "more_body": False} - before = anyio.current_time() await manager.handle_request(scope, mock_receive, mock_send) session_id = None @@ -352,35 +350,43 @@ async def mock_receive(): # pragma: no cover break assert session_id is not None, "Session ID not found in response headers" - assert session_id in manager._server_instances - - # Verify the idle deadline was set correctly - transport = manager._server_instances[session_id] - assert transport.idle_scope is not None - assert transport.idle_scope.deadline >= before + idle_timeout - - # Simulate time passing by expiring the deadline - transport.idle_scope.deadline = anyio.current_time() - - with anyio.fail_after(5): - while session_id in manager._server_instances: - await anyio.sleep(0) - - assert session_id not in manager._server_instances - - # Verify terminate() is idempotent - await transport.terminate() - assert transport.is_terminated - - -@pytest.mark.parametrize( - "kwargs,match", - [ - ({"session_idle_timeout": -1}, "positive number"), - ({"session_idle_timeout": 0}, "positive number"), - ({"session_idle_timeout": 30, "stateless": True}, "not supported in stateless"), - ], -) -def test_session_idle_timeout_validation(kwargs: dict[str, Any], match: str): - with pytest.raises(ValueError, match=match): - StreamableHTTPSessionManager(app=Server("test"), **kwargs) + + # Wait for the 50ms idle timeout to fire and cleanup to complete + await anyio.sleep(0.1) + + # Verify via public API: old session ID now returns 404 + response_messages: list[Message] = [] + + async def capture_send(message: Message): + response_messages.append(message) + + scope_with_session = { + "type": "http", + "method": "POST", + "path": "/mcp", + "headers": [ + (b"content-type", b"application/json"), + (b"mcp-session-id", session_id.encode()), + ], + } + + await manager.handle_request(scope_with_session, mock_receive, capture_send) + + response_start = next( + (msg for msg in response_messages if msg["type"] == "http.response.start"), + None, + ) + assert response_start is not None + assert response_start["status"] == 404 + + +def test_session_idle_timeout_rejects_non_positive(): + with pytest.raises(ValueError, match="positive number"): + StreamableHTTPSessionManager(app=Server("test"), session_idle_timeout=-1) + with pytest.raises(ValueError, match="positive number"): + StreamableHTTPSessionManager(app=Server("test"), session_idle_timeout=0) + + +def test_session_idle_timeout_rejects_stateless(): + with pytest.raises(RuntimeError, match="not supported in stateless"): + StreamableHTTPSessionManager(app=Server("test"), session_idle_timeout=30, stateless=True)