From a68c7fcbb5e53d8f1ca26464702eae1abf7ca337 Mon Sep 17 00:00:00 2001 From: Max Isbey <224885523+maxisbey@users.noreply.github.com> Date: Tue, 30 Jun 2026 18:25:11 +0000 Subject: [PATCH 1/7] Serve subscriptions/listen with a pluggable event bus (SEP-2575) On the 2026-07-28 wire there is no standing GET stream: clients opt in to server events via a subscriptions/listen request whose response is the stream. Add the server-side runtime: - mcp/server/subscriptions.py: an EventBus protocol (publish/subscribe over four typed ServerEvent kinds) with an in-process InMemoryEventBus default; implement it over an external pub/sub backend (e.g. Redis) to fan events out across replicas. ListenHandler serves the method: ack-first, per-stream filter honoring, subscription-id tagging on every frame, and close() ends all streams gracefully with the stamped SubscriptionsListenResult. - MCPServer takes subscriptions= (defaults to the in-memory bus), registers the handler automatically, and exposes the bus as a property. Context gains notify_tools_changed / notify_prompts_changed / notify_resources_changed / notify_resource_updated to publish from inside handlers. - Lowlevel Server users compose the same parts themselves via the existing on_subscriptions_listen slot; no lowlevel, session, or transport changes. - Remove the server-stateless conformance baselines: the scenario's listen checks (ack-first, subscription-id tagging, filter honoring) now pass. --- .../expected-failures.2026-07-28.yml | 5 +- .../actions/conformance/expected-failures.yml | 10 - docs/advanced/low-level-server.md | 1 + docs/tutorial/context.md | 2 + src/mcp/server/mcpserver/context.py | 21 +- src/mcp/server/mcpserver/server.py | 17 ++ src/mcp/server/subscriptions.py | 236 ++++++++++++++++++ tests/server/mcpserver/test_server.py | 39 +++ tests/server/test_subscriptions.py | 236 ++++++++++++++++++ 9 files changed, 551 insertions(+), 16 deletions(-) create mode 100644 src/mcp/server/subscriptions.py create mode 100644 tests/server/test_subscriptions.py diff --git a/.github/actions/conformance/expected-failures.2026-07-28.yml b/.github/actions/conformance/expected-failures.2026-07-28.yml index 5b19d6d2d..504b46385 100644 --- a/.github/actions/conformance/expected-failures.2026-07-28.yml +++ b/.github/actions/conformance/expected-failures.2026-07-28.yml @@ -22,7 +22,4 @@ client: [] -server: - # SEP-2575 subscriptions/listen is not implemented yet; see the matching - # entry in expected-failures.yml for the full rationale. - - server-stateless +server: [] diff --git a/.github/actions/conformance/expected-failures.yml b/.github/actions/conformance/expected-failures.yml index 6d99fba75..aa31cb757 100644 --- a/.github/actions/conformance/expected-failures.yml +++ b/.github/actions/conformance/expected-failures.yml @@ -13,16 +13,6 @@ client: [] server: - # SEP-2575 subscriptions/listen is not implemented yet. The everything- - # server's legacy resources/subscribe handlers make it advertise - # `resources.subscribe` in server/discover, and as of conformance #372 a - # server that advertises a subscription capability but answers - # subscriptions/listen with -32601 fails the three listen MUST checks - # ("Not testable") instead of skipping them. Remove this entry when the - # listen runtime lands. NOTE: while listed, this entry also masks new - # failures in the scenario's other 25 (currently passing) checks — the - # baseline is per-scenario, not per-check. - - server-stateless # SEP-2663 (io.modelcontextprotocol/tasks): the SDK does not implement the # tasks extension yet. These extension-tagged scenarios are selected only by # the bare `--suite all` leg — extension scenarios never match a diff --git a/docs/advanced/low-level-server.md b/docs/advanced/low-level-server.md index 6568b76a5..df369d89c 100644 --- a/docs/advanced/low-level-server.md +++ b/docs/advanced/low-level-server.md @@ -183,6 +183,7 @@ Each of these is one idea you now have the vocabulary for; each has its own chap * `on_call_tool`, `on_get_prompt`, and `on_read_resource` may return an `InputRequiredResult` instead of their normal result to pause the call and ask the client for input; see **[Multi-round-trip requests](multi-round-trip.md)**. True to this tier, nothing is installed for you: where `MCPServer` seals `requestState` by default, here the `request_state` you set crosses the wire exactly as written until you opt in with `server.middleware.append(RequestStateBoundary(RequestStateSecurity(keys=[...]), default_audience=server.name))`: one line (both names import from `mcp.server.request_state`) for the identical sealing and verification `MCPServer` performs (**[Protecting `requestState`](multi-round-trip.md#protecting-requeststate)**). * `on_list_resources`, `on_read_resource`, `on_list_prompts`, `on_get_prompt`, `on_completion` are the same `(ctx, params) -> result` shape for the other primitives. +* `on_subscriptions_listen` serves the 2026-07-28 `subscriptions/listen` stream. Pass an `mcp.server.subscriptions.ListenHandler` built over an `EventBus` (the in-memory default, or your own — e.g. Redis-backed), keep the bus where your other handlers can reach it (the lifespan is a natural home), and publish `ServerEvent`s to it. The handler owns the wire semantics: ack-first, per-stream filtering, and subscription-id tagging. * `server.streamable_http_app()` returns the same Starlette app `MCPServer`'s does; deploy it the way **[Running your server](../run/index.md)** deploys any other ASGI app. There is no `server.run(transport=...)` down here: `server.run(read_stream, write_stream, server.create_initialization_options())` drives one connection over a pair of streams, and that one line is the whole story. ## Recap diff --git a/docs/tutorial/context.md b/docs/tutorial/context.md index c2d43c472..96052d660 100644 --- a/docs/tutorial/context.md +++ b/docs/tutorial/context.md @@ -104,6 +104,8 @@ What a server offers is not fixed at import time. Register a tool at runtime, th The siblings are `send_resource_list_changed()`, `send_prompt_list_changed()`, and `send_resource_updated(uri)` for a change to one specific resource. +On a 2026-07-28 connection, clients receive change notifications only on a `subscriptions/listen` stream they opened. The `Context` publish methods — `ctx.notify_tools_changed()`, `ctx.notify_prompts_changed()`, `ctx.notify_resources_changed()`, and `ctx.notify_resource_updated(uri)` — deliver to every subscribed stream at once, and are synchronous (no `await`). Behind a load balancer, pass your own `EventBus` implementation as `MCPServer(subscriptions=...)` to fan events out across replicas; the in-process default covers a single server. + !!! check Before anyone runs `enable_recommendations`, the tool you are promising does not exist. Call it anyway and the result is an error the model can read: diff --git a/src/mcp/server/mcpserver/context.py b/src/mcp/server/mcpserver/context.py index 664046741..1626bb9a7 100644 --- a/src/mcp/server/mcpserver/context.py +++ b/src/mcp/server/mcpserver/context.py @@ -16,6 +16,7 @@ elicit_with_validation, ) from mcp.server.lowlevel.helper_types import ReadResourceContents +from mcp.server.subscriptions import PromptsListChanged, ResourcesListChanged, ResourceUpdated, ToolsListChanged from mcp.shared.exceptions import MCPDeprecationWarning if TYPE_CHECKING: @@ -78,9 +79,9 @@ def __init__( @property def mcp_server(self) -> MCPServer: """Access to the MCPServer instance.""" - if self._mcp_server is None: # pragma: no cover + if self._mcp_server is None: raise ValueError("Context is not available outside of a request") - return self._mcp_server # pragma: no cover + return self._mcp_server @property def request_context(self) -> ServerRequestContext[LifespanContextT, RequestT]: @@ -109,6 +110,22 @@ async def report_progress(self, progress: float, total: float | None = None, mes """ await self.request_context.session.report_progress(progress, total, message) + def notify_tools_changed(self) -> None: + """Publish a tools list-changed event to `subscriptions/listen` subscribers.""" + self.mcp_server.subscriptions.publish(ToolsListChanged()) + + def notify_prompts_changed(self) -> None: + """Publish a prompts list-changed event to `subscriptions/listen` subscribers.""" + self.mcp_server.subscriptions.publish(PromptsListChanged()) + + def notify_resources_changed(self) -> None: + """Publish a resources list-changed event to `subscriptions/listen` subscribers.""" + self.mcp_server.subscriptions.publish(ResourcesListChanged()) + + def notify_resource_updated(self, uri: str | AnyUrl) -> None: + """Publish a resource-updated event for `uri` to `subscriptions/listen` subscribers.""" + self.mcp_server.subscriptions.publish(ResourceUpdated(uri=str(uri))) + async def read_resource(self, uri: str | AnyUrl) -> Iterable[ReadResourceContents]: """Read a resource by URI. diff --git a/src/mcp/server/mcpserver/server.py b/src/mcp/server/mcpserver/server.py index 3750429cd..e49bc8a8f 100644 --- a/src/mcp/server/mcpserver/server.py +++ b/src/mcp/server/mcpserver/server.py @@ -88,6 +88,7 @@ from mcp.server.stdio import stdio_server from mcp.server.streamable_http import EventStore from mcp.server.streamable_http_manager import StreamableHTTPSessionManager +from mcp.server.subscriptions import EventBus, InMemoryEventBus, ListenHandler from mcp.server.transport_security import TransportSecuritySettings from mcp.shared.exceptions import MCPError from mcp.shared.uri_template import UriTemplate @@ -182,6 +183,7 @@ def __init__( resource_security: ResourceSecurity = DEFAULT_RESOURCE_SECURITY, request_state_security: RequestStateSecurity | None = None, cache_hints: Mapping[CacheableMethod, CacheHint] | None = None, + subscriptions: EventBus | None = None, ): self._resource_security = resource_security self.settings = Settings( @@ -201,6 +203,10 @@ def __init__( resources=resources, warn_on_duplicate_resources=self.settings.warn_on_duplicate_resources ) self._prompt_manager = PromptManager(warn_on_duplicate_prompts=self.settings.warn_on_duplicate_prompts) + # The subscriptions/listen fan-out seam (2026-07-28). The default bus is + # in-process; pass an `EventBus` implementation over an external pub/sub + # backend to fan events out across replicas. + self._subscriptions: EventBus = subscriptions if subscriptions is not None else InMemoryEventBus() self._lowlevel_server = Server( name=name or "mcp-server", title=title, @@ -217,6 +223,7 @@ def __init__( on_list_resource_templates=self._handle_list_resource_templates, on_list_prompts=self._handle_list_prompts, on_get_prompt=self._handle_get_prompt, + on_subscriptions_listen=ListenHandler(self._subscriptions), # TODO(Marcelo): It seems there's a type mismatch between the lifespan type from an MCPServer and Server. # We need to create a Lifespan type that is a generic on the server type, like Starlette does. lifespan=(lifespan_wrapper(self, self.settings.lifespan) if self.settings.lifespan else default_lifespan), # type: ignore @@ -285,6 +292,16 @@ def icons(self) -> list[Icon] | None: def version(self) -> str | None: return self._lowlevel_server.version + @property + def subscriptions(self) -> EventBus: + """The `subscriptions/listen` event bus. + + Publish a `ServerEvent` here (or via the `Context.notify_*` methods) + to deliver it to subscribed clients. The bus passed to the constructor, + or the in-process default. + """ + return self._subscriptions + @property def session_manager(self) -> StreamableHTTPSessionManager: """Get the StreamableHTTP session manager. diff --git a/src/mcp/server/subscriptions.py b/src/mcp/server/subscriptions.py new file mode 100644 index 000000000..ae3f35dcd --- /dev/null +++ b/src/mcp/server/subscriptions.py @@ -0,0 +1,236 @@ +"""Server-side `subscriptions/listen` support (2026-07-28, SEP-2575). + +On the 2026-07-28 wire there is no standing GET stream: a client opts in to +server events by sending a `subscriptions/listen` request whose response IS +the stream. This module provides the two pieces a server needs: + +- `EventBus`: the pluggable fan-out seam. The bus carries typed `ServerEvent` + values, not wire notifications - the listen handler owns subscription-id + stamping and per-stream filtering, so a custom bus (e.g. backed by Redis + pub/sub for multi-replica deployments) never sees JSON-RPC. The in-process + default is `InMemoryEventBus`. +- `ListenHandler`: the request handler that serves `subscriptions/listen`. + `MCPServer` registers one automatically; lowlevel `Server` users pass an + instance as `on_subscriptions_listen=`. + +Per the spec, the handler acknowledges first (the ack is the first frame on +the stream), tags every frame with the listen request's JSON-RPC id under +`_meta["io.modelcontextprotocol/subscriptionId"]`, and never delivers an +event kind the client did not request. Delivery is fire-and-forget with no +replay: a dropped stream is not resumable - clients re-listen and refetch. +""" + +from __future__ import annotations + +import math +from collections.abc import Callable +from dataclasses import dataclass +from typing import Any, Protocol + +import anyio +import anyio.streams.memory +from mcp_types import ( + INVALID_REQUEST, + NotificationParams, + PromptListChangedNotification, + ResourceListChangedNotification, + ResourceUpdatedNotification, + ResourceUpdatedNotificationParams, + ServerNotification, + SubscriptionFilter, + SubscriptionsAcknowledgedNotification, + SubscriptionsAcknowledgedNotificationParams, + SubscriptionsListenRequestParams, + SubscriptionsListenResult, + ToolListChangedNotification, +) + +from mcp.server.context import ServerRequestContext +from mcp.shared.exceptions import MCPError + +SUBSCRIPTION_ID_META_KEY = "io.modelcontextprotocol/subscriptionId" +"""The `_meta` key carrying the subscription id on every listen-stream frame. + +The value is the `subscriptions/listen` request's JSON-RPC id, verbatim. +""" + + +@dataclass(frozen=True) +class ToolsListChanged: + """The server's tool list changed.""" + + +@dataclass(frozen=True) +class PromptsListChanged: + """The server's prompt list changed.""" + + +@dataclass(frozen=True) +class ResourcesListChanged: + """The server's resource list changed.""" + + +@dataclass(frozen=True) +class ResourceUpdated: + """The resource at `uri` changed and may need to be read again.""" + + uri: str + + +ServerEvent = ToolsListChanged | PromptsListChanged | ResourcesListChanged | ResourceUpdated +"""An event a server publishes for delivery to listen subscribers.""" + + +class EventBus(Protocol): + """Fan-out seam between event publishers and open listen streams. + + Implement this over an external pub/sub backend (Redis, NATS, ...) to fan + events out across replicas: `publish` forwards the event to the backend, + and each replica's bus invokes its local listeners for events arriving + from the backend. The same instance can be shared across servers. + + Both methods are synchronous and must be called from the server's event + loop thread. Listeners must not raise. + """ + + def publish(self, event: ServerEvent) -> None: + """Deliver `event` to every subscribed listener.""" + ... + + def subscribe(self, listener: Callable[[ServerEvent], None]) -> Callable[[], None]: + """Register `listener` and return an idempotent unsubscribe callable.""" + ... + + +class InMemoryEventBus: + """In-process `EventBus`: synchronous fan-out to a set of listeners.""" + + def __init__(self) -> None: + self._listeners: set[Callable[[ServerEvent], None]] = set() + + def publish(self, event: ServerEvent) -> None: + """Deliver `event` to every subscribed listener.""" + for listener in list(self._listeners): + listener(event) + + def subscribe(self, listener: Callable[[ServerEvent], None]) -> Callable[[], None]: + """Register `listener` and return an idempotent unsubscribe callable.""" + self._listeners.add(listener) + + def unsubscribe() -> None: + self._listeners.discard(listener) + + return unsubscribe + + +def _honored_subset(requested: SubscriptionFilter) -> SubscriptionFilter: + """The subset of `requested` the server will deliver, for the ack. + + Every requested kind is honored - whether an event kind ever fires + depends on what the server publishes, exactly as a subscription to a + nonexistent resource URI is honored and never fires. Non-true flags and + an empty URI list are dropped rather than echoed as falsy values. + """ + return SubscriptionFilter( + tools_list_changed=True if requested.tools_list_changed else None, + prompts_list_changed=True if requested.prompts_list_changed else None, + resources_list_changed=True if requested.resources_list_changed else None, + resource_subscriptions=list(requested.resource_subscriptions) if requested.resource_subscriptions else None, + ) + + +def _event_matches(honored: SubscriptionFilter, event: ServerEvent) -> bool: + """Whether `event` is within the stream's honored filter.""" + if isinstance(event, ToolsListChanged): + return honored.tools_list_changed is True + if isinstance(event, PromptsListChanged): + return honored.prompts_list_changed is True + if isinstance(event, ResourcesListChanged): + return honored.resources_list_changed is True + return honored.resource_subscriptions is not None and event.uri in honored.resource_subscriptions + + +def _event_to_notification(event: ServerEvent, meta: dict[str, Any]) -> ServerNotification: + """Build the stamped wire notification for `event`.""" + if isinstance(event, ToolsListChanged): + return ToolListChangedNotification(params=NotificationParams(_meta=meta)) + if isinstance(event, PromptsListChanged): + return PromptListChangedNotification(params=NotificationParams(_meta=meta)) + if isinstance(event, ResourcesListChanged): + return ResourceListChangedNotification(params=NotificationParams(_meta=meta)) + return ResourceUpdatedNotification(params=ResourceUpdatedNotificationParams(uri=event.uri, _meta=meta)) + + +class ListenHandler: + """Serves `subscriptions/listen`: one call is one subscription stream. + + Register on a lowlevel `Server` via `on_subscriptions_listen=` (or + `add_request_handler`); `MCPServer` does so automatically. Each call + acknowledges the honored filter first, then forwards matching bus events + onto the request's response stream until the client disconnects (which + cancels the handler; the stream just ends, per the spec's abrupt-close + contract) or `close` ends all streams gracefully. + + Requires a transport that can stream a request's response (streamable + HTTP's SSE mode, stdio). + """ + + def __init__(self, bus: EventBus) -> None: + self._bus = bus + self._streams: set[anyio.streams.memory.MemoryObjectSendStream[ServerEvent]] = set() + + async def __call__( + self, + ctx: ServerRequestContext[Any, Any], + params: SubscriptionsListenRequestParams, + ) -> SubscriptionsListenResult: + """Serve one listen stream.""" + subscription_id = ctx.request_id + if subscription_id is None: + raise MCPError(INVALID_REQUEST, "subscriptions/listen requires a request id") + honored = _honored_subset(params.notifications) + meta: dict[str, Any] = {SUBSCRIPTION_ID_META_KEY: subscription_id} + + # Ack first, subscribe second: no event can precede the ack frame. + await ctx.session.send_notification( + SubscriptionsAcknowledgedNotification( + params=SubscriptionsAcknowledgedNotificationParams(notifications=honored, _meta=meta) + ), + related_request_id=subscription_id, + ) + + # Unbounded buffer so publishers never block on a slow consumer (the + # transport write happens in this handler task, not the publisher's). + send, recv = anyio.create_memory_object_stream[ServerEvent](math.inf) + + def deliver(event: ServerEvent) -> None: + if _event_matches(honored, event): + try: + send.send_nowait(event) + except anyio.ClosedResourceError: + # `aclose` closed this stream; the loop below is unwinding. + pass + + unsubscribe = self._bus.subscribe(deliver) + self._streams.add(send) + try: + async for event in recv: + await ctx.session.send_notification( + _event_to_notification(event, meta), related_request_id=subscription_id + ) + finally: + unsubscribe() + self._streams.discard(send) + send.close() + recv.close() + return SubscriptionsListenResult(_meta=meta) + + def close(self) -> None: + """Gracefully end every open listen stream. + + Each stream sends its `SubscriptionsListenResult` (stamped with the + subscription id) as the final frame and closes - the spec's graceful + closure flow, signalling clients not to re-listen. + """ + for stream in list(self._streams): + stream.close() diff --git a/tests/server/mcpserver/test_server.py b/tests/server/mcpserver/test_server.py index 2ae9d5ff7..d645181bd 100644 --- a/tests/server/mcpserver/test_server.py +++ b/tests/server/mcpserver/test_server.py @@ -51,6 +51,14 @@ from mcp.server.mcpserver.prompts.base import Message, UserMessage from mcp.server.mcpserver.resources import FileResource, FunctionResource from mcp.server.mcpserver.utilities.types import Audio, Image +from mcp.server.subscriptions import ( + InMemoryEventBus, + PromptsListChanged, + ResourcesListChanged, + ResourceUpdated, + ServerEvent, + ToolsListChanged, +) from mcp.server.transport_security import TransportSecuritySettings from mcp.shared.exceptions import MCPError from mcp.shared.uri_template import InvalidUriTemplate @@ -2248,3 +2256,34 @@ async def probe(ctx: Context) -> str: await client.call_tool("probe") assert captured == {"responses": None, "state": None} + + +def test_subscriptions_bus_defaults_to_in_memory_and_accepts_custom() -> None: + assert isinstance(MCPServer().subscriptions, InMemoryEventBus) + bus = InMemoryEventBus() + assert MCPServer(subscriptions=bus).subscriptions is bus + + +async def test_context_notify_methods_publish_to_the_subscriptions_bus() -> None: + mcp = MCPServer() + seen: list[ServerEvent] = [] + mcp.subscriptions.subscribe(seen.append) + + @mcp.tool() + def touch(ctx: Context) -> str: + ctx.notify_tools_changed() + ctx.notify_prompts_changed() + ctx.notify_resources_changed() + ctx.notify_resource_updated("r://x") + return "ok" + + with anyio.fail_after(5): + async with Client(mcp) as client: + await client.call_tool("touch") + + assert seen == [ToolsListChanged(), PromptsListChanged(), ResourcesListChanged(), ResourceUpdated(uri="r://x")] + + +def test_context_mcp_server_outside_request_raises() -> None: + with pytest.raises(ValueError, match="outside of a request"): + _ = Context().mcp_server diff --git a/tests/server/test_subscriptions.py b/tests/server/test_subscriptions.py new file mode 100644 index 000000000..6d0337f55 --- /dev/null +++ b/tests/server/test_subscriptions.py @@ -0,0 +1,236 @@ +"""Tests for `subscriptions/listen` serving (mcp.server.subscriptions).""" + +from typing import Any, cast + +import anyio +import pytest +from mcp_types import ( + INVALID_REQUEST, + PromptListChangedNotification, + RequestId, + ResourceListChangedNotification, + ResourceUpdatedNotification, + ServerNotification, + SubscriptionFilter, + SubscriptionsAcknowledgedNotification, + SubscriptionsListenRequestParams, + SubscriptionsListenResult, + ToolListChangedNotification, +) + +from mcp.server.context import ServerRequestContext +from mcp.server.session import ServerSession +from mcp.server.subscriptions import ( + SUBSCRIPTION_ID_META_KEY, + InMemoryEventBus, + ListenHandler, + PromptsListChanged, + ResourcesListChanged, + ResourceUpdated, + ServerEvent, + ToolsListChanged, +) +from mcp.shared.exceptions import MCPError + + +class _RecordingSession: + """Stands in for `ServerSession`: records sent notifications and wakes waiters.""" + + def __init__(self) -> None: + self.sent: list[tuple[ServerNotification, RequestId | None]] = [] + self._arrival = anyio.Event() + + async def send_notification( + self, notification: ServerNotification, related_request_id: RequestId | None = None + ) -> None: + self.sent.append((notification, related_request_id)) + self._arrival.set() + self._arrival = anyio.Event() + + async def wait_for(self, count: int) -> None: + with anyio.fail_after(5): + while len(self.sent) < count: + await self._arrival.wait() + + +def _ctx(session: _RecordingSession, request_id: RequestId | None = 7) -> ServerRequestContext[Any, Any]: + return ServerRequestContext( + session=cast(ServerSession, session), + lifespan_context={}, + protocol_version="2026-07-28", + method="subscriptions/listen", + request_id=request_id, + ) + + +def _params(**fields: Any) -> SubscriptionsListenRequestParams: + return SubscriptionsListenRequestParams(notifications=SubscriptionFilter(**fields)) + + +class _SpyBus(InMemoryEventBus): + """Counts unsubscribe calls so tests can assert stream cleanup.""" + + def __init__(self) -> None: + super().__init__() + self.unsubscribed = 0 + + def subscribe(self, listener: Any) -> Any: + unsubscribe = super().subscribe(listener) + + def counting_unsubscribe() -> None: + self.unsubscribed += 1 + unsubscribe() + + return counting_unsubscribe + + +def test_in_memory_bus_fans_out_until_unsubscribed() -> None: + bus = InMemoryEventBus() + seen_a: list[ServerEvent] = [] + seen_b: list[ServerEvent] = [] + unsubscribe_a = bus.subscribe(seen_a.append) + bus.subscribe(seen_b.append) + + bus.publish(ToolsListChanged()) + assert seen_a == [ToolsListChanged()] + assert seen_b == [ToolsListChanged()] + + unsubscribe_a() + unsubscribe_a() # idempotent + bus.publish(PromptsListChanged()) + assert seen_a == [ToolsListChanged()] + assert seen_b == [ToolsListChanged(), PromptsListChanged()] + + +@pytest.mark.anyio +async def test_ack_first_honored_subset_and_stamped_graceful_result() -> None: + bus = _SpyBus() + handler = ListenHandler(bus) + session = _RecordingSession() + results: list[SubscriptionsListenResult] = [] + + async with anyio.create_task_group() as tg: + + async def run() -> None: + results.append( + await handler( + _ctx(session), + _params(tools_list_changed=True, prompts_list_changed=False, resource_subscriptions=["r://a"]), + ) + ) + + tg.start_soon(run) + await session.wait_for(1) + + ack, related = session.sent[0] + assert isinstance(ack, SubscriptionsAcknowledgedNotification) + assert related == 7 + # Honored subset: requested-false and absent kinds are omitted, not echoed. + assert ack.params.notifications == SubscriptionFilter(tools_list_changed=True, resource_subscriptions=["r://a"]) + assert ack.params.meta == {SUBSCRIPTION_ID_META_KEY: 7} + + bus.publish(ToolsListChanged()) + await session.wait_for(2) + event, related = session.sent[1] + assert isinstance(event, ToolListChangedNotification) + assert related == 7 + assert event.params is not None and event.params.meta == {SUBSCRIPTION_ID_META_KEY: 7} + + handler.close() + + assert results[0].meta == {SUBSCRIPTION_ID_META_KEY: 7} + assert bus.unsubscribed == 1 # the stream unsubscribed on the way out + + +@pytest.mark.anyio +async def test_only_requested_event_kinds_are_delivered() -> None: + bus = InMemoryEventBus() + handler = ListenHandler(bus) + session = _RecordingSession() + + async with anyio.create_task_group() as tg: + + async def run() -> None: + await handler( + _ctx(session), + _params(prompts_list_changed=True, resources_list_changed=True, resource_subscriptions=["r://a"]), + ) + + tg.start_soon(run) + await session.wait_for(1) + + bus.publish(ToolsListChanged()) # not requested + bus.publish(ResourceUpdated(uri="r://other")) # URI not subscribed + bus.publish(PromptsListChanged()) + bus.publish(ResourcesListChanged()) + bus.publish(ResourceUpdated(uri="r://a")) + await session.wait_for(4) + handler.close() + + delivered = [notification for notification, _ in session.sent[1:]] + assert isinstance(delivered[0], PromptListChangedNotification) + assert isinstance(delivered[1], ResourceListChangedNotification) + assert isinstance(delivered[2], ResourceUpdatedNotification) + assert delivered[2].params.uri == "r://a" + assert delivered[2].params.meta == {SUBSCRIPTION_ID_META_KEY: 7} + assert len(delivered) == 3 + + +@pytest.mark.anyio +async def test_empty_filter_honors_nothing_and_delivers_nothing() -> None: + bus = InMemoryEventBus() + handler = ListenHandler(bus) + session = _RecordingSession() + + async with anyio.create_task_group() as tg: + + async def run() -> None: + await handler(_ctx(session), _params(tools_list_changed=False, resource_subscriptions=[])) + + tg.start_soon(run) + await session.wait_for(1) + + ack, _ = session.sent[0] + assert isinstance(ack, SubscriptionsAcknowledgedNotification) + assert ack.params.notifications == SubscriptionFilter() + + for event in (ToolsListChanged(), PromptsListChanged(), ResourcesListChanged(), ResourceUpdated(uri="r://a")): + bus.publish(event) + handler.close() + + assert len(session.sent) == 1 # the ack only + + +@pytest.mark.anyio +async def test_publish_after_close_is_dropped() -> None: + bus = InMemoryEventBus() + handler = ListenHandler(bus) + session = _RecordingSession() + + async with anyio.create_task_group() as tg: + + async def run() -> None: + await handler(_ctx(session), _params(tools_list_changed=True)) + + tg.start_soon(run) + await session.wait_for(1) + + handler.close() + # The handler task has not resumed yet, so the listener is still + # subscribed but its stream is closed: the event is dropped. + bus.publish(ToolsListChanged()) + + assert len(session.sent) == 1 + + +@pytest.mark.anyio +async def test_listen_requires_a_request_id() -> None: + handler = ListenHandler(InMemoryEventBus()) + + with pytest.raises(MCPError) as exc_info: + await handler(_ctx(_RecordingSession(), request_id=None), _params()) + assert exc_info.value.error.code == INVALID_REQUEST + + +def test_close_without_open_streams_is_a_no_op() -> None: + ListenHandler(InMemoryEventBus()).close() From bfd473e3698d225966cfe8f6e6603cefefcb5a17 Mon Sep 17 00:00:00 2001 From: Max Isbey <224885523+maxisbey@users.noreply.github.com> Date: Tue, 30 Jun 2026 18:51:36 +0000 Subject: [PATCH 2/7] Make the subscription bus async and fix review findings Review follow-ups to the subscriptions/listen runtime: - Rename EventBus to SubscriptionBus: it carries exactly the four subscription event kinds, and the generic name collided conceptually with the unrelated EventStore in the same package. - Make SubscriptionBus.publish async. Backend implementations (Redis, NATS) do network I/O on publish; a sync protocol would force them to block the loop or spawn their own tasks. This also makes the Context notify_* methods genuinely async, so they cannot be called from sync handlers (which run on worker threads where waking the listen streams is unsafe) - the illegal context is now unrepresentable instead of guarded. - Subscribe each listen stream to the bus before sending the ack: an event published while the ack write was suspended used to be lost after the client had been told the subscription was live. The ack is still the first frame - the handler task alone writes the stream and only drains the buffer after the ack send returns. - Register bus listeners under per-subscription tokens: equal callables (e.g. the same bound method subscribed twice) used to collapse in the set, so one unsubscribe silently detached both registrations. - Drop the stdio claim from ListenHandler's docstring (no 2026-era stdio serving exists yet) and document that resource-updated URIs are matched as exact strings and do not reach legacy resources/subscribe subscribers. --- docs/advanced/low-level-server.md | 2 +- docs/tutorial/context.md | 2 +- src/mcp/server/mcpserver/context.py | 31 +++++--- src/mcp/server/mcpserver/server.py | 10 +-- src/mcp/server/subscriptions.py | 54 ++++++++------ tests/server/mcpserver/test_server.py | 16 ++-- tests/server/test_subscriptions.py | 101 +++++++++++++++++++++----- 7 files changed, 147 insertions(+), 69 deletions(-) diff --git a/docs/advanced/low-level-server.md b/docs/advanced/low-level-server.md index df369d89c..176ea38a6 100644 --- a/docs/advanced/low-level-server.md +++ b/docs/advanced/low-level-server.md @@ -183,7 +183,7 @@ Each of these is one idea you now have the vocabulary for; each has its own chap * `on_call_tool`, `on_get_prompt`, and `on_read_resource` may return an `InputRequiredResult` instead of their normal result to pause the call and ask the client for input; see **[Multi-round-trip requests](multi-round-trip.md)**. True to this tier, nothing is installed for you: where `MCPServer` seals `requestState` by default, here the `request_state` you set crosses the wire exactly as written until you opt in with `server.middleware.append(RequestStateBoundary(RequestStateSecurity(keys=[...]), default_audience=server.name))`: one line (both names import from `mcp.server.request_state`) for the identical sealing and verification `MCPServer` performs (**[Protecting `requestState`](multi-round-trip.md#protecting-requeststate)**). * `on_list_resources`, `on_read_resource`, `on_list_prompts`, `on_get_prompt`, `on_completion` are the same `(ctx, params) -> result` shape for the other primitives. -* `on_subscriptions_listen` serves the 2026-07-28 `subscriptions/listen` stream. Pass an `mcp.server.subscriptions.ListenHandler` built over an `EventBus` (the in-memory default, or your own — e.g. Redis-backed), keep the bus where your other handlers can reach it (the lifespan is a natural home), and publish `ServerEvent`s to it. The handler owns the wire semantics: ack-first, per-stream filtering, and subscription-id tagging. +* `on_subscriptions_listen` serves the 2026-07-28 `subscriptions/listen` stream. Pass an `mcp.server.subscriptions.ListenHandler` built over a `SubscriptionBus` (the in-memory default, or your own — e.g. Redis-backed), keep the bus where your other handlers can reach it (the lifespan is a natural home), and publish `ServerEvent`s to it. The handler owns the wire semantics: ack-first, per-stream filtering, and subscription-id tagging. * `server.streamable_http_app()` returns the same Starlette app `MCPServer`'s does; deploy it the way **[Running your server](../run/index.md)** deploys any other ASGI app. There is no `server.run(transport=...)` down here: `server.run(read_stream, write_stream, server.create_initialization_options())` drives one connection over a pair of streams, and that one line is the whole story. ## Recap diff --git a/docs/tutorial/context.md b/docs/tutorial/context.md index 96052d660..3d1330237 100644 --- a/docs/tutorial/context.md +++ b/docs/tutorial/context.md @@ -104,7 +104,7 @@ What a server offers is not fixed at import time. Register a tool at runtime, th The siblings are `send_resource_list_changed()`, `send_prompt_list_changed()`, and `send_resource_updated(uri)` for a change to one specific resource. -On a 2026-07-28 connection, clients receive change notifications only on a `subscriptions/listen` stream they opened. The `Context` publish methods — `ctx.notify_tools_changed()`, `ctx.notify_prompts_changed()`, `ctx.notify_resources_changed()`, and `ctx.notify_resource_updated(uri)` — deliver to every subscribed stream at once, and are synchronous (no `await`). Behind a load balancer, pass your own `EventBus` implementation as `MCPServer(subscriptions=...)` to fan events out across replicas; the in-process default covers a single server. +On a 2026-07-28 connection, clients receive change notifications only on a `subscriptions/listen` stream they opened — the `send_*` methods above do not reach those streams. The `Context` publish methods — `await ctx.notify_tools_changed()`, `await ctx.notify_prompts_changed()`, `await ctx.notify_resources_changed()`, and `await ctx.notify_resource_updated(uri)` — deliver to every subscribed stream at once. Behind a load balancer, pass your own `SubscriptionBus` implementation as `MCPServer(subscriptions=...)` to fan events out across replicas; the in-process default covers a single server. !!! check Before anyone runs `enable_recommendations`, the tool you are promising does not exist. Call it diff --git a/src/mcp/server/mcpserver/context.py b/src/mcp/server/mcpserver/context.py index 1626bb9a7..9a128be21 100644 --- a/src/mcp/server/mcpserver/context.py +++ b/src/mcp/server/mcpserver/context.py @@ -16,7 +16,12 @@ elicit_with_validation, ) from mcp.server.lowlevel.helper_types import ReadResourceContents -from mcp.server.subscriptions import PromptsListChanged, ResourcesListChanged, ResourceUpdated, ToolsListChanged +from mcp.server.subscriptions import ( + PromptsListChanged, + ResourcesListChanged, + ResourceUpdated, + ToolsListChanged, +) from mcp.shared.exceptions import MCPDeprecationWarning if TYPE_CHECKING: @@ -110,21 +115,27 @@ async def report_progress(self, progress: float, total: float | None = None, mes """ await self.request_context.session.report_progress(progress, total, message) - def notify_tools_changed(self) -> None: + async def notify_tools_changed(self) -> None: """Publish a tools list-changed event to `subscriptions/listen` subscribers.""" - self.mcp_server.subscriptions.publish(ToolsListChanged()) + await self.mcp_server.subscriptions.publish(ToolsListChanged()) - def notify_prompts_changed(self) -> None: + async def notify_prompts_changed(self) -> None: """Publish a prompts list-changed event to `subscriptions/listen` subscribers.""" - self.mcp_server.subscriptions.publish(PromptsListChanged()) + await self.mcp_server.subscriptions.publish(PromptsListChanged()) - def notify_resources_changed(self) -> None: + async def notify_resources_changed(self) -> None: """Publish a resources list-changed event to `subscriptions/listen` subscribers.""" - self.mcp_server.subscriptions.publish(ResourcesListChanged()) + await self.mcp_server.subscriptions.publish(ResourcesListChanged()) + + async def notify_resource_updated(self, uri: str | AnyUrl) -> None: + """Publish a resource-updated event for `uri` to `subscriptions/listen` subscribers. - def notify_resource_updated(self, uri: str | AnyUrl) -> None: - """Publish a resource-updated event for `uri` to `subscriptions/listen` subscribers.""" - self.mcp_server.subscriptions.publish(ResourceUpdated(uri=str(uri))) + The URI is matched as an exact string against each stream's filter. + Reaches `subscriptions/listen` streams only; clients on earlier + protocol versions that used `resources/subscribe` are notified via + `ctx.session.send_resource_updated(uri)` instead. + """ + await self.mcp_server.subscriptions.publish(ResourceUpdated(uri=str(uri))) async def read_resource(self, uri: str | AnyUrl) -> Iterable[ReadResourceContents]: """Read a resource by URI. diff --git a/src/mcp/server/mcpserver/server.py b/src/mcp/server/mcpserver/server.py index e49bc8a8f..0dcfc4af2 100644 --- a/src/mcp/server/mcpserver/server.py +++ b/src/mcp/server/mcpserver/server.py @@ -88,7 +88,7 @@ from mcp.server.stdio import stdio_server from mcp.server.streamable_http import EventStore from mcp.server.streamable_http_manager import StreamableHTTPSessionManager -from mcp.server.subscriptions import EventBus, InMemoryEventBus, ListenHandler +from mcp.server.subscriptions import InMemorySubscriptionBus, ListenHandler, SubscriptionBus from mcp.server.transport_security import TransportSecuritySettings from mcp.shared.exceptions import MCPError from mcp.shared.uri_template import UriTemplate @@ -183,7 +183,7 @@ def __init__( resource_security: ResourceSecurity = DEFAULT_RESOURCE_SECURITY, request_state_security: RequestStateSecurity | None = None, cache_hints: Mapping[CacheableMethod, CacheHint] | None = None, - subscriptions: EventBus | None = None, + subscriptions: SubscriptionBus | None = None, ): self._resource_security = resource_security self.settings = Settings( @@ -204,9 +204,9 @@ def __init__( ) self._prompt_manager = PromptManager(warn_on_duplicate_prompts=self.settings.warn_on_duplicate_prompts) # The subscriptions/listen fan-out seam (2026-07-28). The default bus is - # in-process; pass an `EventBus` implementation over an external pub/sub + # in-process; pass an `SubscriptionBus` implementation over an external pub/sub # backend to fan events out across replicas. - self._subscriptions: EventBus = subscriptions if subscriptions is not None else InMemoryEventBus() + self._subscriptions: SubscriptionBus = subscriptions if subscriptions is not None else InMemorySubscriptionBus() self._lowlevel_server = Server( name=name or "mcp-server", title=title, @@ -293,7 +293,7 @@ def version(self) -> str | None: return self._lowlevel_server.version @property - def subscriptions(self) -> EventBus: + def subscriptions(self) -> SubscriptionBus: """The `subscriptions/listen` event bus. Publish a `ServerEvent` here (or via the `Context.notify_*` methods) diff --git a/src/mcp/server/subscriptions.py b/src/mcp/server/subscriptions.py index ae3f35dcd..65fe0a14c 100644 --- a/src/mcp/server/subscriptions.py +++ b/src/mcp/server/subscriptions.py @@ -4,11 +4,11 @@ server events by sending a `subscriptions/listen` request whose response IS the stream. This module provides the two pieces a server needs: -- `EventBus`: the pluggable fan-out seam. The bus carries typed `ServerEvent` +- `SubscriptionBus`: the pluggable fan-out seam. The bus carries typed `ServerEvent` values, not wire notifications - the listen handler owns subscription-id stamping and per-stream filtering, so a custom bus (e.g. backed by Redis pub/sub for multi-replica deployments) never sees JSON-RPC. The in-process - default is `InMemoryEventBus`. + default is `InMemorySubscriptionBus`. - `ListenHandler`: the request handler that serves `subscriptions/listen`. `MCPServer` registers one automatically; lowlevel `Server` users pass an instance as `on_subscriptions_listen=`. @@ -81,7 +81,7 @@ class ResourceUpdated: """An event a server publishes for delivery to listen subscribers.""" -class EventBus(Protocol): +class SubscriptionBus(Protocol): """Fan-out seam between event publishers and open listen streams. Implement this over an external pub/sub backend (Redis, NATS, ...) to fan @@ -89,11 +89,12 @@ class EventBus(Protocol): and each replica's bus invokes its local listeners for events arriving from the backend. The same instance can be shared across servers. - Both methods are synchronous and must be called from the server's event - loop thread. Listeners must not raise. + `publish` is async so backend implementations can do network I/O. + `subscribe` is synchronous local registration. Listeners are synchronous, + must not raise, and are invoked on the server's event loop. """ - def publish(self, event: ServerEvent) -> None: + async def publish(self, event: ServerEvent) -> None: """Deliver `event` to every subscribed listener.""" ... @@ -102,23 +103,26 @@ def subscribe(self, listener: Callable[[ServerEvent], None]) -> Callable[[], Non ... -class InMemoryEventBus: - """In-process `EventBus`: synchronous fan-out to a set of listeners.""" +class InMemorySubscriptionBus: + """In-process `SubscriptionBus`: synchronous fan-out to listeners in subscription order.""" def __init__(self) -> None: - self._listeners: set[Callable[[ServerEvent], None]] = set() + # Keyed by a per-subscription token so the same callable can be + # registered more than once (bound methods compare equal). + self._listeners: dict[object, Callable[[ServerEvent], None]] = {} - def publish(self, event: ServerEvent) -> None: + async def publish(self, event: ServerEvent) -> None: """Deliver `event` to every subscribed listener.""" - for listener in list(self._listeners): + for listener in list(self._listeners.values()): listener(event) def subscribe(self, listener: Callable[[ServerEvent], None]) -> Callable[[], None]: """Register `listener` and return an idempotent unsubscribe callable.""" - self._listeners.add(listener) + token = object() + self._listeners[token] = listener def unsubscribe() -> None: - self._listeners.discard(listener) + self._listeners.pop(token, None) return unsubscribe @@ -172,10 +176,10 @@ class ListenHandler: contract) or `close` ends all streams gracefully. Requires a transport that can stream a request's response (streamable - HTTP's SSE mode, stdio). + HTTP's SSE mode). """ - def __init__(self, bus: EventBus) -> None: + def __init__(self, bus: SubscriptionBus) -> None: self._bus = bus self._streams: set[anyio.streams.memory.MemoryObjectSendStream[ServerEvent]] = set() @@ -191,14 +195,6 @@ async def __call__( honored = _honored_subset(params.notifications) meta: dict[str, Any] = {SUBSCRIPTION_ID_META_KEY: subscription_id} - # Ack first, subscribe second: no event can precede the ack frame. - await ctx.session.send_notification( - SubscriptionsAcknowledgedNotification( - params=SubscriptionsAcknowledgedNotificationParams(notifications=honored, _meta=meta) - ), - related_request_id=subscription_id, - ) - # Unbounded buffer so publishers never block on a slow consumer (the # transport write happens in this handler task, not the publisher's). send, recv = anyio.create_memory_object_stream[ServerEvent](math.inf) @@ -208,12 +204,22 @@ def deliver(event: ServerEvent) -> None: try: send.send_nowait(event) except anyio.ClosedResourceError: - # `aclose` closed this stream; the loop below is unwinding. + # `close` closed this stream; the loop below is unwinding. pass + # Subscribe before sending the ack so an event published while the + # ack write is suspended is buffered rather than lost. The ack is + # still the first frame: this task alone writes the stream, and it + # only starts draining the buffer after the ack send returns. unsubscribe = self._bus.subscribe(deliver) self._streams.add(send) try: + await ctx.session.send_notification( + SubscriptionsAcknowledgedNotification( + params=SubscriptionsAcknowledgedNotificationParams(notifications=honored, _meta=meta) + ), + related_request_id=subscription_id, + ) async for event in recv: await ctx.session.send_notification( _event_to_notification(event, meta), related_request_id=subscription_id diff --git a/tests/server/mcpserver/test_server.py b/tests/server/mcpserver/test_server.py index d645181bd..9d04cfd25 100644 --- a/tests/server/mcpserver/test_server.py +++ b/tests/server/mcpserver/test_server.py @@ -52,7 +52,7 @@ from mcp.server.mcpserver.resources import FileResource, FunctionResource from mcp.server.mcpserver.utilities.types import Audio, Image from mcp.server.subscriptions import ( - InMemoryEventBus, + InMemorySubscriptionBus, PromptsListChanged, ResourcesListChanged, ResourceUpdated, @@ -2259,8 +2259,8 @@ async def probe(ctx: Context) -> str: def test_subscriptions_bus_defaults_to_in_memory_and_accepts_custom() -> None: - assert isinstance(MCPServer().subscriptions, InMemoryEventBus) - bus = InMemoryEventBus() + assert isinstance(MCPServer().subscriptions, InMemorySubscriptionBus) + bus = InMemorySubscriptionBus() assert MCPServer(subscriptions=bus).subscriptions is bus @@ -2270,11 +2270,11 @@ async def test_context_notify_methods_publish_to_the_subscriptions_bus() -> None mcp.subscriptions.subscribe(seen.append) @mcp.tool() - def touch(ctx: Context) -> str: - ctx.notify_tools_changed() - ctx.notify_prompts_changed() - ctx.notify_resources_changed() - ctx.notify_resource_updated("r://x") + async def touch(ctx: Context) -> str: + await ctx.notify_tools_changed() + await ctx.notify_prompts_changed() + await ctx.notify_resources_changed() + await ctx.notify_resource_updated("r://x") return "ok" with anyio.fail_after(5): diff --git a/tests/server/test_subscriptions.py b/tests/server/test_subscriptions.py index 6d0337f55..3acb298c3 100644 --- a/tests/server/test_subscriptions.py +++ b/tests/server/test_subscriptions.py @@ -1,5 +1,6 @@ """Tests for `subscriptions/listen` serving (mcp.server.subscriptions).""" +from collections.abc import Callable from typing import Any, cast import anyio @@ -22,7 +23,7 @@ from mcp.server.session import ServerSession from mcp.server.subscriptions import ( SUBSCRIPTION_ID_META_KEY, - InMemoryEventBus, + InMemorySubscriptionBus, ListenHandler, PromptsListChanged, ResourcesListChanged, @@ -67,14 +68,14 @@ def _params(**fields: Any) -> SubscriptionsListenRequestParams: return SubscriptionsListenRequestParams(notifications=SubscriptionFilter(**fields)) -class _SpyBus(InMemoryEventBus): +class _SpyBus(InMemorySubscriptionBus): """Counts unsubscribe calls so tests can assert stream cleanup.""" def __init__(self) -> None: super().__init__() self.unsubscribed = 0 - def subscribe(self, listener: Any) -> Any: + def subscribe(self, listener: Callable[[ServerEvent], None]) -> Callable[[], None]: unsubscribe = super().subscribe(listener) def counting_unsubscribe() -> None: @@ -84,26 +85,47 @@ def counting_unsubscribe() -> None: return counting_unsubscribe -def test_in_memory_bus_fans_out_until_unsubscribed() -> None: - bus = InMemoryEventBus() +@pytest.mark.anyio +async def test_in_memory_bus_fans_out_until_unsubscribed() -> None: + """SDK-defined bus contract: fan-out to all listeners; unsubscribe is idempotent.""" + bus = InMemorySubscriptionBus() seen_a: list[ServerEvent] = [] seen_b: list[ServerEvent] = [] unsubscribe_a = bus.subscribe(seen_a.append) bus.subscribe(seen_b.append) - bus.publish(ToolsListChanged()) + await bus.publish(ToolsListChanged()) assert seen_a == [ToolsListChanged()] assert seen_b == [ToolsListChanged()] unsubscribe_a() unsubscribe_a() # idempotent - bus.publish(PromptsListChanged()) + await bus.publish(PromptsListChanged()) assert seen_a == [ToolsListChanged()] assert seen_b == [ToolsListChanged(), PromptsListChanged()] +@pytest.mark.anyio +async def test_in_memory_bus_keeps_equal_callables_distinct() -> None: + """SDK-defined: registering the same callable twice yields two registrations, + and each unsubscribe detaches exactly one (bound methods compare equal).""" + bus = InMemorySubscriptionBus() + seen: list[ServerEvent] = [] + first = bus.subscribe(seen.append) + bus.subscribe(seen.append) + + await bus.publish(ToolsListChanged()) + assert len(seen) == 2 + + first() + await bus.publish(ToolsListChanged()) + assert len(seen) == 3 + + @pytest.mark.anyio async def test_ack_first_honored_subset_and_stamped_graceful_result() -> None: + """Spec-mandated: the ack is the first frame, echoes the honored subset, and + every frame (graceful result included) carries the subscription-id tag.""" bus = _SpyBus() handler = ListenHandler(bus) session = _RecordingSession() @@ -129,7 +151,7 @@ async def run() -> None: assert ack.params.notifications == SubscriptionFilter(tools_list_changed=True, resource_subscriptions=["r://a"]) assert ack.params.meta == {SUBSCRIPTION_ID_META_KEY: 7} - bus.publish(ToolsListChanged()) + await bus.publish(ToolsListChanged()) await session.wait_for(2) event, related = session.sent[1] assert isinstance(event, ToolListChangedNotification) @@ -144,7 +166,9 @@ async def run() -> None: @pytest.mark.anyio async def test_only_requested_event_kinds_are_delivered() -> None: - bus = InMemoryEventBus() + """Spec-mandated: the server never sends a notification type (or resource URI) + the client did not request on this stream.""" + bus = InMemorySubscriptionBus() handler = ListenHandler(bus) session = _RecordingSession() @@ -159,11 +183,11 @@ async def run() -> None: tg.start_soon(run) await session.wait_for(1) - bus.publish(ToolsListChanged()) # not requested - bus.publish(ResourceUpdated(uri="r://other")) # URI not subscribed - bus.publish(PromptsListChanged()) - bus.publish(ResourcesListChanged()) - bus.publish(ResourceUpdated(uri="r://a")) + await bus.publish(ToolsListChanged()) # not requested + await bus.publish(ResourceUpdated(uri="r://other")) # URI not subscribed + await bus.publish(PromptsListChanged()) + await bus.publish(ResourcesListChanged()) + await bus.publish(ResourceUpdated(uri="r://a")) await session.wait_for(4) handler.close() @@ -178,7 +202,9 @@ async def run() -> None: @pytest.mark.anyio async def test_empty_filter_honors_nothing_and_delivers_nothing() -> None: - bus = InMemoryEventBus() + """SDK-defined: falsy flags and an empty URI list are dropped from the ack + rather than echoed, and such a stream delivers nothing.""" + bus = InMemorySubscriptionBus() handler = ListenHandler(bus) session = _RecordingSession() @@ -195,7 +221,7 @@ async def run() -> None: assert ack.params.notifications == SubscriptionFilter() for event in (ToolsListChanged(), PromptsListChanged(), ResourcesListChanged(), ResourceUpdated(uri="r://a")): - bus.publish(event) + await bus.publish(event) handler.close() assert len(session.sent) == 1 # the ack only @@ -203,7 +229,8 @@ async def run() -> None: @pytest.mark.anyio async def test_publish_after_close_is_dropped() -> None: - bus = InMemoryEventBus() + """SDK-defined: an event racing `close()` while the stream unwinds is dropped.""" + bus = InMemorySubscriptionBus() handler = ListenHandler(bus) session = _RecordingSession() @@ -218,14 +245,47 @@ async def run() -> None: handler.close() # The handler task has not resumed yet, so the listener is still # subscribed but its stream is closed: the event is dropped. - bus.publish(ToolsListChanged()) + await bus.publish(ToolsListChanged()) assert len(session.sent) == 1 +@pytest.mark.anyio +async def test_event_published_during_ack_send_is_delivered_after_the_ack() -> None: + """SDK-defined: the stream subscribes before sending the ack, so an event + published while the ack write is suspended is buffered and delivered after + it - never lost, and never ahead of the ack frame.""" + bus = InMemorySubscriptionBus() + handler = ListenHandler(bus) + + class _PublishDuringAck(_RecordingSession): + async def send_notification( + self, notification: ServerNotification, related_request_id: RequestId | None = None + ) -> None: + if not self.sent: + # Publish while the handler is still inside the ack send. + await bus.publish(ToolsListChanged()) + await super().send_notification(notification, related_request_id) + + session = _PublishDuringAck() + + async with anyio.create_task_group() as tg: + + async def run() -> None: + await handler(_ctx(session), _params(tools_list_changed=True)) + + tg.start_soon(run) + await session.wait_for(2) + handler.close() + + assert isinstance(session.sent[0][0], SubscriptionsAcknowledgedNotification) + assert isinstance(session.sent[1][0], ToolListChangedNotification) + + @pytest.mark.anyio async def test_listen_requires_a_request_id() -> None: - handler = ListenHandler(InMemoryEventBus()) + """SDK-defined: a context without a request id cannot open a stream.""" + handler = ListenHandler(InMemorySubscriptionBus()) with pytest.raises(MCPError) as exc_info: await handler(_ctx(_RecordingSession(), request_id=None), _params()) @@ -233,4 +293,5 @@ async def test_listen_requires_a_request_id() -> None: def test_close_without_open_streams_is_a_no_op() -> None: - ListenHandler(InMemoryEventBus()).close() + """SDK-defined: `close()` with nothing open does nothing.""" + ListenHandler(InMemorySubscriptionBus()).close() From 04553f079c7671fc1f6af9beb39a22f09e1e8ade Mon Sep 17 00:00:00 2001 From: Max Isbey <224885523+maxisbey@users.noreply.github.com> Date: Tue, 30 Jun 2026 19:48:05 +0000 Subject: [PATCH 3/7] Add the subscriptions docs page and example story - docs/advanced/subscriptions.md: publish-first narrative for MCPServer (ctx.notify_*), the filter/no-replay contract, the SubscriptionBus seam for multi-replica deployments, and the low-level composition. Snippets under docs_src/subscriptions/ with their claims proved by tests/docs_src/test_subscriptions.py; cross-linked from the context tutorial and the low-level server page. - examples/stories/subscriptions: promote the deferred stub to a runnable story (modern era, both transports, MCPServer and lowlevel variants). The client opens a listen stream through the session escape hatch, watches one URI and the tool list, observes exact-URI filtering and a runtime tool registration announcing itself, and closes the stream by cancelling the parked request. --- docs/advanced/low-level-server.md | 2 +- docs/advanced/subscriptions.md | 80 ++++++++++ docs/tutorial/context.md | 2 +- docs_src/subscriptions/__init__.py | 0 docs_src/subscriptions/tutorial001.py | 28 ++++ docs_src/subscriptions/tutorial002.py | 40 +++++ examples/stories/manifest.toml | 7 +- examples/stories/subscriptions/README.md | 67 ++++++--- examples/stories/subscriptions/__init__.py | 0 examples/stories/subscriptions/client.py | 94 ++++++++++++ examples/stories/subscriptions/server.py | 36 +++++ .../stories/subscriptions/server_lowlevel.py | 72 +++++++++ mkdocs.yml | 1 + tests/docs_src/test_subscriptions.py | 138 ++++++++++++++++++ 14 files changed, 546 insertions(+), 21 deletions(-) create mode 100644 docs/advanced/subscriptions.md create mode 100644 docs_src/subscriptions/__init__.py create mode 100644 docs_src/subscriptions/tutorial001.py create mode 100644 docs_src/subscriptions/tutorial002.py create mode 100644 examples/stories/subscriptions/__init__.py create mode 100644 examples/stories/subscriptions/client.py create mode 100644 examples/stories/subscriptions/server.py create mode 100644 examples/stories/subscriptions/server_lowlevel.py create mode 100644 tests/docs_src/test_subscriptions.py diff --git a/docs/advanced/low-level-server.md b/docs/advanced/low-level-server.md index 176ea38a6..123c85dd7 100644 --- a/docs/advanced/low-level-server.md +++ b/docs/advanced/low-level-server.md @@ -183,7 +183,7 @@ Each of these is one idea you now have the vocabulary for; each has its own chap * `on_call_tool`, `on_get_prompt`, and `on_read_resource` may return an `InputRequiredResult` instead of their normal result to pause the call and ask the client for input; see **[Multi-round-trip requests](multi-round-trip.md)**. True to this tier, nothing is installed for you: where `MCPServer` seals `requestState` by default, here the `request_state` you set crosses the wire exactly as written until you opt in with `server.middleware.append(RequestStateBoundary(RequestStateSecurity(keys=[...]), default_audience=server.name))`: one line (both names import from `mcp.server.request_state`) for the identical sealing and verification `MCPServer` performs (**[Protecting `requestState`](multi-round-trip.md#protecting-requeststate)**). * `on_list_resources`, `on_read_resource`, `on_list_prompts`, `on_get_prompt`, `on_completion` are the same `(ctx, params) -> result` shape for the other primitives. -* `on_subscriptions_listen` serves the 2026-07-28 `subscriptions/listen` stream. Pass an `mcp.server.subscriptions.ListenHandler` built over a `SubscriptionBus` (the in-memory default, or your own — e.g. Redis-backed), keep the bus where your other handlers can reach it (the lifespan is a natural home), and publish `ServerEvent`s to it. The handler owns the wire semantics: ack-first, per-stream filtering, and subscription-id tagging. +* `on_subscriptions_listen` serves the 2026-07-28 `subscriptions/listen` stream. Pass a `ListenHandler` built over a `SubscriptionBus` and publish events to the bus from your other handlers; see **[Subscriptions](subscriptions.md)** for the full composition. * `server.streamable_http_app()` returns the same Starlette app `MCPServer`'s does; deploy it the way **[Running your server](../run/index.md)** deploys any other ASGI app. There is no `server.run(transport=...)` down here: `server.run(read_stream, write_stream, server.create_initialization_options())` drives one connection over a pair of streams, and that one line is the whole story. ## Recap diff --git a/docs/advanced/subscriptions.md b/docs/advanced/subscriptions.md new file mode 100644 index 000000000..8816a1ea3 --- /dev/null +++ b/docs/advanced/subscriptions.md @@ -0,0 +1,80 @@ +# Subscriptions + +A server's catalog is not fixed. Tools get registered at runtime, resources change behind their URIs. The client side of that story is a subscription: on the 2026-07-28 protocol, a client that wants to hear about changes sends one `subscriptions/listen` request, and the response to that request *is* the stream — it stays open, carrying exactly the notification kinds the client asked for. + +Your side of it is one line: publish the change. + +```python title="server.py" hl_lines="16 27" +--8<-- "docs_src/subscriptions/tutorial001.py" +``` + +* `await ctx.notify_resource_updated("note://todo")` delivers `notifications/resources/updated` to every open listen stream that subscribed to that URI. Not to anyone else. +* `await ctx.notify_tools_changed()` delivers `notifications/tools/list_changed` to every stream that asked for tool-list changes. A client that receives it calls `tools/list` again — and now sees `search`. +* The siblings are `notify_prompts_changed()` and `notify_resources_changed()`, for the other two list-changed kinds. +* No subscribers, no work: publishing to an idle server is a no-op. You don't check whether anyone is listening; you state what changed. + +The SDK serves `subscriptions/listen` for you — `MCPServer` registers the handler at construction, and the wire obligations (the acknowledgment as the first frame, the per-stream filtering, the subscription id tagged onto every frame) are its job, not yours. + +!!! check + On the wire, a stream whose filter named `note://todo` looks like this after `edit_note` runs: + + ```json + {"method": "notifications/subscriptions/acknowledged", + "params": {"notifications": {"resourceSubscriptions": ["note://todo"]}, "_meta": {"io.modelcontextprotocol/subscriptionId": 7}}} + + {"method": "notifications/resources/updated", + "params": {"uri": "note://todo", "_meta": {"io.modelcontextprotocol/subscriptionId": 7}}} + ``` + + The acknowledgment echoes the filter the server agreed to honor, and every frame carries the + listen request's JSON-RPC id under `_meta` — that id *is* the subscription id. + +## Only what was asked for + +The filter is a contract. A stream that requested tool-list changes and one resource URI receives those two kinds and nothing else — publish a prompt change and that stream stays silent. Resource URIs are matched as exact strings: `note://todo` does not cover `note://todo/draft`. + +Two more things the stream is *not*: + +* **It is not a replay log.** A dropped stream is gone; events published while nobody was connected are not queued. The client's contract is to re-listen and re-fetch what it cares about. +* **It is not the 2025 path.** Clients on earlier protocol versions that called `resources/subscribe` are served by `ctx.session.send_resource_updated(uri)` — the `notify_*` methods reach `subscriptions/listen` streams only. + +## One process is the default. More takes a bus + +Publishes travel from your handler to the open streams over a `SubscriptionBus`. The default is in-memory: one process, every stream in it. That is the right answer until you run replicas behind a load balancer — then a client's stream is pinned to one replica, and a publish on another replica has to reach it. + +That seam is yours to implement: two methods over your pub/sub backend. + +```python +class RedisSubscriptionBus: + async def publish(self, event: ServerEvent) -> None: + await self.redis.publish("mcp-events", encode(event)) # to every replica + + def subscribe(self, listener: Callable[[ServerEvent], None]) -> Callable[[], None]: + ... # register the local listener; a reader task calls it for arriving events +``` + +```python +mcp = MCPServer("Notebook", subscriptions=RedisSubscriptionBus(...)) +``` + +The bus carries typed `ServerEvent` values — four small dataclasses — never JSON-RPC. Stamping, filtering, and stream lifecycles stay in the SDK, so a bus implementation cannot break the protocol; it can only move events between processes. The same instance is reachable as `mcp.subscriptions`, which is also how you publish from outside a request: `await mcp.subscriptions.publish(ToolsListChanged())`. + +## The low-level composition + +Down on the low-level `Server` there is no pre-wired anything — and the same parts assemble in three lines: + +```python title="server.py" hl_lines="9 31 39" +--8<-- "docs_src/subscriptions/tutorial002.py" +``` + +* You own the bus, so you publish to it directly: `await bus.publish(ResourceUpdated(uri=...))`. Put it wherever your handlers can reach it — module scope here, the lifespan in a bigger app. +* `ListenHandler(bus)` is the same handler `MCPServer` registers; `on_subscriptions_listen=` is an ordinary handler slot. Don't want the SDK's semantics? Write your own handler for the slot — the spec obligations come with it. +* `ListenHandler.close()` gracefully ends every open stream: each one receives the listen request's result as its final frame, the spec's signal that the server ended the subscription deliberately and the client shouldn't re-listen. Without it, streams end when the client disconnects. + +## Recap + +* A client opts in with one `subscriptions/listen` request; the response is the stream. There is nothing to configure server-side — serving it is built in. +* You publish: `await ctx.notify_resource_updated(uri)`, `notify_tools_changed()`, `notify_prompts_changed()`, `notify_resources_changed()`. Idle servers make these free. +* Streams receive only what their filter requested; URIs match exactly; nothing is replayed. +* Scaling out means implementing `SubscriptionBus` — two methods — over your own pub/sub, and passing it as `MCPServer(subscriptions=...)`. +* The low-level spelling is the same machinery held in your hands: a bus, `ListenHandler(bus)`, one constructor argument. diff --git a/docs/tutorial/context.md b/docs/tutorial/context.md index 3d1330237..96bdd0776 100644 --- a/docs/tutorial/context.md +++ b/docs/tutorial/context.md @@ -104,7 +104,7 @@ What a server offers is not fixed at import time. Register a tool at runtime, th The siblings are `send_resource_list_changed()`, `send_prompt_list_changed()`, and `send_resource_updated(uri)` for a change to one specific resource. -On a 2026-07-28 connection, clients receive change notifications only on a `subscriptions/listen` stream they opened — the `send_*` methods above do not reach those streams. The `Context` publish methods — `await ctx.notify_tools_changed()`, `await ctx.notify_prompts_changed()`, `await ctx.notify_resources_changed()`, and `await ctx.notify_resource_updated(uri)` — deliver to every subscribed stream at once. Behind a load balancer, pass your own `SubscriptionBus` implementation as `MCPServer(subscriptions=...)` to fan events out across replicas; the in-process default covers a single server. +On a 2026-07-28 connection, clients receive change notifications only on a `subscriptions/listen` stream they opened — the `send_*` methods above do not reach those streams. The `Context` publish methods — `await ctx.notify_tools_changed()`, `await ctx.notify_prompts_changed()`, `await ctx.notify_resources_changed()`, and `await ctx.notify_resource_updated(uri)` — deliver to every subscribed stream at once. The whole story, including scaling out across replicas, is in **[Subscriptions](../advanced/subscriptions.md)**. !!! check Before anyone runs `enable_recommendations`, the tool you are promising does not exist. Call it diff --git a/docs_src/subscriptions/__init__.py b/docs_src/subscriptions/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/docs_src/subscriptions/tutorial001.py b/docs_src/subscriptions/tutorial001.py new file mode 100644 index 000000000..5063fceed --- /dev/null +++ b/docs_src/subscriptions/tutorial001.py @@ -0,0 +1,28 @@ +from mcp.server.mcpserver import Context, MCPServer + +mcp = MCPServer("Notebook") + +NOTES = {"todo": "buy milk", "journal": "day one"} + + +@mcp.resource("note://{name}") +def note(name: str) -> str: + return NOTES[name] + + +@mcp.tool() +async def edit_note(name: str, text: str, ctx: Context) -> str: + NOTES[name] = text + await ctx.notify_resource_updated(f"note://{name}") + return "saved" + + +def search(query: str) -> list[str]: + return [name for name, text in NOTES.items() if query in text] + + +@mcp.tool() +async def enable_search(ctx: Context) -> str: + mcp.add_tool(search) + await ctx.notify_tools_changed() + return "search is live" diff --git a/docs_src/subscriptions/tutorial002.py b/docs_src/subscriptions/tutorial002.py new file mode 100644 index 000000000..c0b04f64d --- /dev/null +++ b/docs_src/subscriptions/tutorial002.py @@ -0,0 +1,40 @@ +from typing import Any + +import mcp_types as types + +from mcp.server.context import ServerRequestContext +from mcp.server.lowlevel import Server +from mcp.server.subscriptions import InMemorySubscriptionBus, ListenHandler, ResourceUpdated + +bus = InMemorySubscriptionBus() + +NOTES = {"todo": "buy milk"} + +EDIT_NOTE_SCHEMA: dict[str, Any] = { + "type": "object", + "properties": {"name": {"type": "string"}, "text": {"type": "string"}}, + "required": ["name", "text"], +} + + +async def list_tools( + ctx: ServerRequestContext[Any], params: types.PaginatedRequestParams | None +) -> types.ListToolsResult: + return types.ListToolsResult( + tools=[types.Tool(name="edit_note", description="Replace a note's text.", input_schema=EDIT_NOTE_SCHEMA)] + ) + + +async def call_tool(ctx: ServerRequestContext[Any], params: types.CallToolRequestParams) -> types.CallToolResult: + args = params.arguments or {} + NOTES[args["name"]] = args["text"] + await bus.publish(ResourceUpdated(uri=f"note://{args['name']}")) + return types.CallToolResult(content=[types.TextContent(type="text", text="saved")]) + + +server = Server( + "notebook", + on_list_tools=list_tools, + on_call_tool=call_tool, + on_subscriptions_listen=ListenHandler(bus), +) diff --git a/examples/stories/manifest.toml b/examples/stories/manifest.toml index 1ba2fe862..965e04aa4 100644 --- a/examples/stories/manifest.toml +++ b/examples/stories/manifest.toml @@ -68,6 +68,12 @@ lowlevel = false transports = ["in-memory", "http-asgi"] era = "dual-in-body" +[story.subscriptions] +# subscriptions/listen exists only on the 2026 wire, so there is no legacy leg. +# The listen request parks for the stream's lifetime; the client ends it by +# cancelling the awaiting scope (the spec's client-side close). +era = "modern" + [story.schema_validators] [story.middleware] @@ -166,7 +172,6 @@ fixed_port = 8000 # issuer/PRM metadata bake in :8 [deferred] caching = "client honouring + per-result override unlanded" -subscriptions = "#2901 — Client.listen / ServerEventBus" tasks = "SEP-2663 — tasks extension runtime (server-decided augmentation, CreateTaskResult)" skills = "#2896 — SEP-2640" events = "#2901 + #2896" diff --git a/examples/stories/subscriptions/README.md b/examples/stories/subscriptions/README.md index d41d0f82b..b3f51b4b6 100644 --- a/examples/stories/subscriptions/README.md +++ b/examples/stories/subscriptions/README.md @@ -1,27 +1,58 @@ # subscriptions -The 2026-era `subscriptions/listen` channel: the server publishes change events -through a `ServerEventBus`, and `Client.listen()` opens an async iterator over -them. Replaces the handshake-era `resources/subscribe` + standalone-GET -notification path. - -**Status: not yet implemented** ([#2901](https://github.com/modelcontextprotocol/python-sdk/issues/2901)). -The lowlevel registration surface is in this base — -[#2967](https://github.com/modelcontextprotocol/python-sdk/pull/2967) -(`ae13ede`) added the lowlevel `on_subscriptions_listen` handler slot — but -there is no `Client.listen()` or `ServerEventBus` yet. The runnable story is -deliberately a follow-up PR to keep this one reviewable. +Server-originated change notifications on the 2026-07-28 protocol. A client +opens one `subscriptions/listen` request whose response **is** the stream; the +server publishes with `ctx.notify_resource_updated(uri)` / +`ctx.notify_tools_changed()` and the SDK does the wire work (ack-first, +per-stream filtering, subscription-id tagging). Replaces the handshake-era +`resources/subscribe` + standalone-GET notification path. + +The client edits a note it did not subscribe to (silence), edits the one it +did (a tagged `notifications/resources/updated`), registers a tool at runtime +(`notifications/tools/list_changed`, then re-lists and calls it), and finally +closes the stream by cancelling the parked request. + +## Run it + +```bash +# HTTP — the client self-hosts the server on a free port, runs, then tears it +# down (subscriptions/listen is 2026-era only) +uv run python -m stories.subscriptions.client --http +# same, against the lowlevel-API server variant +uv run python -m stories.subscriptions.client --http --server server_lowlevel +``` + +## What to look at + +- `client.py` — stream frames arrive as ordinary server notifications via the + constructor-only `message_handler=`. There is no client-side listen API yet, + so opening the stream drops to the `client.session` escape hatch; the request + parks for the stream's lifetime and the client closes the stream by + cancelling it. Every frame's `_meta["io.modelcontextprotocol/subscriptionId"]` + is the listen request's JSON-RPC id. +- `server.py` — publishing is one `await ctx.notify_*()` line per change; the + filter, the tagging, and the ack ordering are the SDK's job. Publishing with + no subscribers is a no-op. +- `server_lowlevel.py` — the same machinery held by hand: an + `InMemorySubscriptionBus`, handlers that `await bus.publish(...)`, and + `ListenHandler(bus)` passed as `on_subscriptions_listen=`. A multi-replica + deployment swaps the bus for one backed by its own pub/sub + (`MCPServer(subscriptions=...)` on the high-level server). + +## Caveats + +- 2026-era only: on a 2025 connection the method does not exist (clients there + use `resources/subscribe` and unsolicited notifications instead), so the + story pins the modern era and has no legacy leg. +- No replay: events published while no stream is open are not queued. The + contract after a dropped stream is re-listen and re-fetch. ## Spec [Subscriptions — basic utilities](https://modelcontextprotocol.io/specification/draft/basic/utilities/subscriptions) -## Working example elsewhere - -The TypeScript SDK ships a runnable `subscriptions` story: -[typescript-sdk/examples/subscriptions](https://github.com/modelcontextprotocol/typescript-sdk/tree/main/examples/subscriptions). - ## See also -`standalone_get/` (handshake-era server-initiated notifications), `resources/` -(legacy `subscribe` deliberately omitted). +`streaming/` (request-scoped notifications), `events/` (the events extension +on top of this channel, deferred), and `docs/advanced/subscriptions.md` (the +narrative version). diff --git a/examples/stories/subscriptions/__init__.py b/examples/stories/subscriptions/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/examples/stories/subscriptions/client.py b/examples/stories/subscriptions/client.py new file mode 100644 index 000000000..d47f4a74d --- /dev/null +++ b/examples/stories/subscriptions/client.py @@ -0,0 +1,94 @@ +"""Open a `subscriptions/listen` stream, watch one URI and the tool list, then close it.""" + +import anyio +import mcp_types as types + +from mcp.client import Client +from stories._harness import Target, run_client + +SUBSCRIPTION_ID = "io.modelcontextprotocol/subscriptionId" + + +async def main(target: Target, *, mode: str = "auto") -> None: + # Stream frames arrive as ordinary server notifications; `message_handler` + # is constructor-only on `Client`, so the list it fills exists first. + received: list[types.ServerNotification] = [] + arrival = anyio.Event() + + async def on_message(message: object) -> None: + nonlocal arrival + if isinstance( + message, + types.SubscriptionsAcknowledgedNotification + | types.ResourceUpdatedNotification + | types.ToolListChangedNotification, + ): + received.append(message) + arrival.set() + arrival = anyio.Event() + + async def wait_for(count: int) -> None: + with anyio.fail_after(10): + while len(received) < count: + await arrival.wait() + + async with Client(target, mode=mode, message_handler=on_message) as client: + before = await client.list_tools() + assert "search" not in {tool.name for tool in before.tools} + + async with anyio.create_task_group() as tg: + # There is no client-side listen API yet, so the story drops to the + # `client.session` escape hatch: the request parks for the stream's + # lifetime, so it runs as a task and the client closes the stream by + # cancelling it (the spec's client-side close). + async def listen() -> None: + request = types.SubscriptionsListenRequest( + params=types.SubscriptionsListenRequestParams( + notifications=types.SubscriptionFilter( + tools_list_changed=True, resource_subscriptions=["note://todo"] + ) + ) + ) + await client.session.send_request(request, types.SubscriptionsListenResult) + + tg.start_soon(listen) + + # ── the ack is the first frame: it echoes the honored filter, tagged ── + await wait_for(1) + ack = received[0] + assert isinstance(ack, types.SubscriptionsAcknowledgedNotification), ack + assert ack.params.notifications.tools_list_changed is True + assert ack.params.notifications.resource_subscriptions == ["note://todo"] + assert ack.params.meta is not None and SUBSCRIPTION_ID in ack.params.meta + + # ── exact-URI filtering: an unsubscribed note edit stays silent ── + await client.call_tool("edit_note", {"name": "journal", "text": "day two"}) + # ── the subscribed URI delivers, carrying the same subscription id ── + await client.call_tool("edit_note", {"name": "todo", "text": "water plants"}) + await wait_for(2) + updated = received[1] + assert isinstance(updated, types.ResourceUpdatedNotification), updated + assert updated.params.uri == "note://todo" + assert updated.params.meta == ack.params.meta + assert len(received) == 2, "the journal edit must not have been delivered" + + # ── a runtime tool registration announces itself ── + await client.call_tool("enable_search", {}) + await wait_for(3) + assert isinstance(received[2], types.ToolListChangedNotification), received[2] + + # The client is done listening: closing the stream is cancelling the + # parked request's scope. + tg.cancel_scope.cancel() + + # list_changed told us to re-fetch - the new tool is callable, and the + # session outlives the closed stream. + tools = await client.list_tools() + assert "search" in {tool.name for tool in tools.tools} + result = await client.call_tool("search", {"query": "water"}) + content = result.content[0] + assert isinstance(content, types.TextContent) and content.text == "todo", result + + +if __name__ == "__main__": + run_client(main) diff --git a/examples/stories/subscriptions/server.py b/examples/stories/subscriptions/server.py new file mode 100644 index 000000000..b102aea30 --- /dev/null +++ b/examples/stories/subscriptions/server.py @@ -0,0 +1,36 @@ +"""A notebook whose edits and tool changes reach `subscriptions/listen` streams.""" + +from mcp.server.mcpserver import Context, MCPServer +from stories._hosting import run_server_from_args + + +def build_server() -> MCPServer: + mcp = MCPServer("subscriptions-example") + notes = {"todo": "buy milk", "journal": "day one"} + + @mcp.resource("note://{name}") + def note(name: str) -> str: + return notes[name] + + @mcp.tool() + async def edit_note(name: str, text: str, ctx: Context) -> str: + """Replace a note's text and tell subscribers that URI changed.""" + notes[name] = text + await ctx.notify_resource_updated(f"note://{name}") + return "saved" + + def search(query: str) -> list[str]: + return [name for name, text in notes.items() if query in text] + + @mcp.tool() + async def enable_search(ctx: Context) -> str: + """Register the `search` tool at runtime and tell subscribers the list changed.""" + mcp.add_tool(search) + await ctx.notify_tools_changed() + return "search is live" + + return mcp + + +if __name__ == "__main__": + run_server_from_args(build_server) diff --git a/examples/stories/subscriptions/server_lowlevel.py b/examples/stories/subscriptions/server_lowlevel.py new file mode 100644 index 000000000..02365c1af --- /dev/null +++ b/examples/stories/subscriptions/server_lowlevel.py @@ -0,0 +1,72 @@ +"""The same notebook against the low-level Server: an explicit bus + ListenHandler.""" + +from typing import Any + +import mcp_types as types + +from mcp.server.context import ServerRequestContext +from mcp.server.lowlevel import Server +from mcp.server.subscriptions import ( + InMemorySubscriptionBus, + ListenHandler, + ResourceUpdated, + ToolsListChanged, +) +from stories._hosting import run_server_from_args + +EDIT_NOTE_SCHEMA: dict[str, Any] = { + "type": "object", + "properties": {"name": {"type": "string"}, "text": {"type": "string"}}, + "required": ["name", "text"], +} +EMPTY_SCHEMA: dict[str, Any] = {"type": "object", "properties": {}} +SEARCH_SCHEMA: dict[str, Any] = { + "type": "object", + "properties": {"query": {"type": "string"}}, + "required": ["query"], +} + + +def build_server() -> Server[Any]: + # The bus lives wherever your handlers can reach it; the lifespan is the + # natural home in a bigger app. The closure is enough here. + bus = InMemorySubscriptionBus() + notes = {"todo": "buy milk", "journal": "day one"} + search_enabled = False + + async def list_tools( + ctx: ServerRequestContext[Any], params: types.PaginatedRequestParams | None + ) -> types.ListToolsResult: + tools = [ + types.Tool(name="edit_note", description="Replace a note's text.", input_schema=EDIT_NOTE_SCHEMA), + types.Tool(name="enable_search", description="Register the search tool.", input_schema=EMPTY_SCHEMA), + ] + if search_enabled: + tools.append(types.Tool(name="search", description="Find notes.", input_schema=SEARCH_SCHEMA)) + return types.ListToolsResult(tools=tools) + + async def call_tool(ctx: ServerRequestContext[Any], params: types.CallToolRequestParams) -> types.CallToolResult: + nonlocal search_enabled + args = params.arguments or {} + if params.name == "edit_note": + notes[args["name"]] = args["text"] + await bus.publish(ResourceUpdated(uri=f"note://{args['name']}")) + return types.CallToolResult(content=[types.TextContent(text="saved")]) + if params.name == "enable_search": + search_enabled = True + await bus.publish(ToolsListChanged()) + return types.CallToolResult(content=[types.TextContent(text="search is live")]) + assert params.name == "search" + matches = [name for name, text in notes.items() if args["query"] in text] + return types.CallToolResult(content=[types.TextContent(text=", ".join(matches))]) + + return Server( + "subscriptions-example", + on_list_tools=list_tools, + on_call_tool=call_tool, + on_subscriptions_listen=ListenHandler(bus), + ) + + +if __name__ == "__main__": + run_server_from_args(build_server) diff --git a/mkdocs.yml b/mkdocs.yml index a00a982be..fda764714 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -44,6 +44,7 @@ nav: - URI templates: advanced/uri-templates.md - Pagination: advanced/pagination.md - Caching hints: advanced/caching.md + - Subscriptions: advanced/subscriptions.md - Middleware: advanced/middleware.md - Extensions: advanced/extensions.md - MCP Apps: advanced/apps.md diff --git a/tests/docs_src/test_subscriptions.py b/tests/docs_src/test_subscriptions.py new file mode 100644 index 000000000..5ab155d0d --- /dev/null +++ b/tests/docs_src/test_subscriptions.py @@ -0,0 +1,138 @@ +"""`docs/advanced/subscriptions.md`: every claim the page makes, proved against the real SDK.""" + +from typing import Any + +import anyio +import mcp_types as types +import pytest + +from docs_src.subscriptions import tutorial001, tutorial002 +from mcp import Client +from mcp.server.subscriptions import SUBSCRIPTION_ID_META_KEY, ToolsListChanged + +# See test_index.py for why this is a per-module mark and not a conftest hook. +pytestmark = [pytest.mark.anyio, pytest.mark.filterwarnings("error::mcp.MCPDeprecationWarning")] + + +class _Stream: + """Collects listen-stream notifications and lets tests await arrival counts.""" + + def __init__(self) -> None: + self.received: list[types.ServerNotification] = [] + self._arrival = anyio.Event() + + async def handler( + self, + message: object, + ) -> None: + # The only messages these connections produce are the stream's frames. + assert isinstance( + message, + types.SubscriptionsAcknowledgedNotification + | types.ResourceUpdatedNotification + | types.ToolListChangedNotification, + ), message + self.received.append(message) + self._arrival.set() + self._arrival = anyio.Event() + + async def wait_for(self, count: int) -> None: + with anyio.fail_after(5): + while len(self.received) < count: + await self._arrival.wait() + + +def _listen_request(**fields: Any) -> types.SubscriptionsListenRequest: + return types.SubscriptionsListenRequest( + params=types.SubscriptionsListenRequestParams(notifications=types.SubscriptionFilter(**fields)) + ) + + +async def test_publishes_reach_the_stream_filtered_and_tagged() -> None: + """tutorial001: the full arc - ack first, exact-URI filtering, list_changed + leading to a refreshed tool list, and client-side close.""" + stream = _Stream() + async with Client(tutorial001.mcp, mode="2026-07-28", message_handler=stream.handler) as client: + async with anyio.create_task_group() as tg: + + async def listen() -> None: + await client.session.send_request( + _listen_request(tools_list_changed=True, resource_subscriptions=["note://todo"]), + types.SubscriptionsListenResult, + ) + + tg.start_soon(listen) + await stream.wait_for(1) + + ack = stream.received[0] + assert isinstance(ack, types.SubscriptionsAcknowledgedNotification) + assert ack.params.notifications == types.SubscriptionFilter( + tools_list_changed=True, resource_subscriptions=["note://todo"] + ) + assert ack.params.meta is not None and SUBSCRIPTION_ID_META_KEY in ack.params.meta + + # An edit to a URI the stream did not subscribe to stays silent... + await client.call_tool("edit_note", {"name": "journal", "text": "day two"}) + # ...and the subscribed URI delivers, tagged with the same subscription id. + await client.call_tool("edit_note", {"name": "todo", "text": "water plants"}) + await stream.wait_for(2) + updated = stream.received[1] + assert isinstance(updated, types.ResourceUpdatedNotification) + assert updated.params.uri == "note://todo" + assert updated.params.meta == ack.params.meta + + await client.call_tool("enable_search", {}) + await stream.wait_for(3) + assert isinstance(stream.received[2], types.ToolListChangedNotification) + + # The client ends the stream by closing it - cancel the parked request. + tg.cancel_scope.cancel() + + # The list_changed told us to re-fetch: the new tool is there, and the + # session outlives the closed stream. + tools = await client.list_tools() + assert "search" in {tool.name for tool in tools.tools} + contents = (await client.read_resource("note://todo")).contents[0] + assert isinstance(contents, types.TextResourceContents) + assert contents.text == "water plants" + + +async def test_publish_with_no_subscribers_is_a_no_op() -> None: + """tutorial001: publishing to an idle server does nothing and breaks nothing.""" + async with Client(tutorial001.mcp, mode="2026-07-28") as client: + result = await client.call_tool("edit_note", {"name": "todo", "text": "buy milk"}) + assert result.is_error is not True + + +async def test_lowlevel_composition_serves_the_same_stream() -> None: + """tutorial002: bus + ListenHandler on the lowlevel Server is the same machinery.""" + stream = _Stream() + async with Client(tutorial002.server, mode="2026-07-28", message_handler=stream.handler) as client: + tools = await client.list_tools() + assert [tool.name for tool in tools.tools] == ["edit_note"] + + async with anyio.create_task_group() as tg: + + async def listen() -> None: + await client.session.send_request( + _listen_request(resource_subscriptions=["note://todo"]), + types.SubscriptionsListenResult, + ) + + tg.start_soon(listen) + await stream.wait_for(1) + + await client.call_tool("edit_note", {"name": "todo", "text": "water plants"}) + await stream.wait_for(2) + updated = stream.received[1] + assert isinstance(updated, types.ResourceUpdatedNotification) + assert updated.params.uri == "note://todo" + + # `mcp.subscriptions` / the bus is also the publish surface outside a + # request; an unrequested kind never reaches this stream. + await tutorial002.bus.publish(ToolsListChanged()) + await client.call_tool("edit_note", {"name": "todo", "text": "done"}) + await stream.wait_for(3) + assert isinstance(stream.received[2], types.ResourceUpdatedNotification) + + tg.cancel_scope.cancel() From 12ffef61c0b1144511b11047417ea3601c4e9877 Mon Sep 17 00:00:00 2001 From: Max Isbey <224885523+maxisbey@users.noreply.github.com> Date: Tue, 30 Jun 2026 20:45:37 +0000 Subject: [PATCH 4/7] Advertise subscription capabilities era-aware and apply review feedback The big one: MCPServer served subscriptions/listen but still advertised listChanged: false / subscribe: false everywhere, so a spec-following client - which capability-gates its listen filter - would never subscribe, making the publish surface unreachable. get_capabilities now takes an optional protocol_version: at 2026-07-28+ the listChanged and resources.subscribe bits derive from whether subscriptions/listen is served (that is what those bits mean on a wire whose only delivery channel is the listen stream); the handshake-era derivation is unchanged and remains the default. server/discover passes the request's version. The everything-server gains test_trigger_tool_change / test_trigger_prompt_change diagnostic tools (backed by a new MCPServer.remove_prompt mirroring remove_tool), so the conformance list-changed SHOULD checks now run and pass: server-stateless is 30/30. Review follow-ups: - The bus is no longer exposed as a public MCPServer property; Context receives it at construction and the notify_* methods publish through it directly. Publishing from outside a request means keeping a reference to the bus you constructed. - json_response=True no longer hangs subscriptions/listen: a listen response is a notification stream, so it takes the SSE path regardless of the JSON-response preference (TypeScript/Go parity). - ListenHandler gains max_subscriptions (rejected pre-ack past the cap) and max_buffered_events (a stream whose client stopped reading is ended at the cap; the client re-listens - no replay means the backlog was already lossy). - Honored resource URIs are matched via a frozenset instead of a list scan on every publish. - InMemorySubscriptionBus isolates raising listeners (logged + skipped) so one bad listener cannot starve the others or fail the publisher. - close() docs now say it initiates closure; each stream flushes from its own handler task. - Story and docs corrections: cancelling the parked listen request only releases the local task over HTTP today (the stream ends with the connection); capability docs/tests updated for the era-aware bits; stories index row promoted; small example hardening. --- docs/advanced/subscriptions.md | 2 +- docs/client/index.md | 2 +- docs/tutorial/first-steps.md | 2 +- .../mcp_everything_server/server.py | 30 ++++++- examples/stories/README.md | 2 +- examples/stories/subscriptions/README.md | 10 ++- examples/stories/subscriptions/client.py | 15 ++-- examples/stories/subscriptions/server.py | 9 +- .../stories/subscriptions/server_lowlevel.py | 2 +- src/mcp/server/_streamable_http_modern.py | 5 +- src/mcp/server/lowlevel/server.py | 29 +++++-- src/mcp/server/mcpserver/context.py | 22 +++-- src/mcp/server/mcpserver/prompts/manager.py | 6 ++ src/mcp/server/mcpserver/server.py | 27 +++--- src/mcp/server/subscriptions.py | 62 ++++++++++---- tests/docs_src/test_client.py | 7 +- tests/docs_src/test_first_steps.py | 6 +- tests/server/lowlevel/test_server_discover.py | 83 +++++++++++++++++-- tests/server/mcpserver/test_server.py | 35 ++++++-- tests/server/test_streamable_http_modern.py | 59 +++++++++++++ tests/server/test_subscriptions.py | 68 +++++++++++++++ 21 files changed, 406 insertions(+), 77 deletions(-) diff --git a/docs/advanced/subscriptions.md b/docs/advanced/subscriptions.md index 8816a1ea3..7364d92e2 100644 --- a/docs/advanced/subscriptions.md +++ b/docs/advanced/subscriptions.md @@ -57,7 +57,7 @@ class RedisSubscriptionBus: mcp = MCPServer("Notebook", subscriptions=RedisSubscriptionBus(...)) ``` -The bus carries typed `ServerEvent` values — four small dataclasses — never JSON-RPC. Stamping, filtering, and stream lifecycles stay in the SDK, so a bus implementation cannot break the protocol; it can only move events between processes. The same instance is reachable as `mcp.subscriptions`, which is also how you publish from outside a request: `await mcp.subscriptions.publish(ToolsListChanged())`. +The bus carries typed `ServerEvent` values — four small dataclasses — never JSON-RPC. Stamping, filtering, and stream lifecycles stay in the SDK, so a bus implementation cannot break the protocol; it can only move events between processes. To publish from outside a request, keep a reference to the bus you constructed and `await bus.publish(ToolsListChanged())` — the server holds the same instance. ## The low-level composition diff --git a/docs/client/index.md b/docs/client/index.md index a8026b5b9..7712e0620 100644 --- a/docs/client/index.md +++ b/docs/client/index.md @@ -145,7 +145,7 @@ The resource verbs come in pairs: two ways to list, one way to read. `read_resource` returns `contents`, a list of `TextResourceContents` or `BlobResourceContents`. Same idea as tool content: narrow with `isinstance`, then read `.text` (or `.blob`). -A client can also **subscribe** to a resource and be told when it changes: `subscribe_resource(uri)` and `unsubscribe_resource(uri)`, same shape as everything else here. `MCPServer` doesn't implement that half. It says so up front (`server_capabilities.resources.subscribe` is `False`) and answers the request with an `MCPError`: `-32601`, *Method not found*. A server that does support subscriptions is built on the low-level `Server` (**[The low-level Server](../advanced/low-level-server.md)**). +A client can also be told when a resource changes. On 2025-era connections that is `subscribe_resource(uri)` / `unsubscribe_resource(uri)` - a method pair `MCPServer` doesn't implement, so on the 2026-07-28 wire (where those verbs no longer exist) the request answers `-32601`, *Method not found*. The 2026 replacement is a `subscriptions/listen` stream, which `MCPServer` *does* serve - `server_capabilities.resources.subscribe` is `True` there, and the server side of the story is **[Subscriptions](../advanced/subscriptions.md)**. ## Prompts diff --git a/docs/tutorial/first-steps.md b/docs/tutorial/first-steps.md index ba59c6487..5328d12be 100644 --- a/docs/tutorial/first-steps.md +++ b/docs/tutorial/first-steps.md @@ -97,7 +97,7 @@ asyncio.run(main()) ``` ```text -{'prompts': {'list_changed': False}, 'resources': {'subscribe': False, 'list_changed': False}, 'tools': {'list_changed': False}} +{'prompts': {'list_changed': True}, 'resources': {'subscribe': True, 'list_changed': True}, 'tools': {'list_changed': True}} ``` That dictionary is the server's half of the handshake: diff --git a/examples/servers/everything-server/mcp_everything_server/server.py b/examples/servers/everything-server/mcp_everything_server/server.py index 218188f50..90dc1f64f 100644 --- a/examples/servers/everything-server/mcp_everything_server/server.py +++ b/examples/servers/everything-server/mcp_everything_server/server.py @@ -13,7 +13,7 @@ import click from mcp.server import ServerRequestContext from mcp.server.mcpserver import Context, MCPServer, RequestStateSecurity -from mcp.server.mcpserver.prompts.base import UserMessage +from mcp.server.mcpserver.prompts.base import Prompt, UserMessage from mcp.server.streamable_http import EventCallback, EventMessage, EventStore from mcp.shared.exceptions import MCPError from mcp_types import ( @@ -585,6 +585,34 @@ async def test_reconnection(ctx: Context) -> str: return "Reconnection test completed" +def _dynamic_tool() -> str: + """A tool registered and removed by test_trigger_tool_change.""" + return "dynamic" + + +def _dynamic_prompt() -> str: + """A prompt registered and removed by test_trigger_prompt_change.""" + return "dynamic" + + +@mcp.tool() +async def test_trigger_tool_change(ctx: Context) -> str: + """Mutates the tool list and announces it to subscriptions/listen streams (SEP-2575)""" + mcp.add_tool(_dynamic_tool, name="test_dynamic_tool") + mcp.remove_tool("test_dynamic_tool") + await ctx.notify_tools_changed() + return "tool list changed" + + +@mcp.tool() +async def test_trigger_prompt_change(ctx: Context) -> str: + """Mutates the prompt list and announces it to subscriptions/listen streams (SEP-2575)""" + mcp.add_prompt(Prompt.from_function(_dynamic_prompt, name="test_dynamic_prompt", description="dynamic")) + mcp.remove_prompt("test_dynamic_prompt") + await ctx.notify_prompts_changed() + return "prompt list changed" + + # Resources @mcp.resource("test://static-text") def static_text_resource() -> str: diff --git a/examples/stories/README.md b/examples/stories/README.md index 79d714311..ed8f1dd9b 100644 --- a/examples/stories/README.md +++ b/examples/stories/README.md @@ -148,6 +148,7 @@ opens with a banner saying what replaces it. | [`starlette_mount`](starlette_mount/) | mounting `streamable_http_app()` under a Starlette/FastAPI sub-path | current | | [`sse_polling`](sse_polling/) | SEP-1699 `closeSSE()` + `Last-Event-ID` resume via `EventStore` | legacy | | [`standalone_get`](standalone_get/) | server-initiated `list_changed` over the sessionful GET stream | legacy | +| [`subscriptions`](subscriptions/) | `subscriptions/listen` streams: `ctx.notify_*`, `SubscriptionBus`, `ListenHandler` | current | | [`reconnect`](reconnect/) | explicit `discover()`, persist `DiscoverResult`, zero-RTT reconnect | current | | [`bearer_auth`](bearer_auth/) | `TokenVerifier` + `AuthSettings` bearer gate, PRM metadata, `get_access_token()` | current | | [`oauth`](oauth/) | full `authorization_code` grant against an in-process AS | current | @@ -155,7 +156,6 @@ opens with a banner saying what replaces it. | [`identity_assertion`](identity_assertion/) | SEP-990 enterprise IdP flow: present an ID-JAG under the `jwt-bearer` grant | current | | **— deferred (README only) —** | | | | [`caching`](caching/) | `CacheableResult` ttl/scope hints; client honouring | not yet implemented | -| [`subscriptions`](subscriptions/) | `subscriptions/listen`, `ServerEventBus`, `Client.listen()` | not yet implemented — [#2901](https://github.com/modelcontextprotocol/python-sdk/issues/2901) | | [`tasks`](tasks/) | `io.modelcontextprotocol/tasks` extension | not yet implemented | | [`apps`](apps/) | MCP Apps: `ui://` resource + `_meta.ui` | not yet implemented — [#2896](https://github.com/modelcontextprotocol/python-sdk/issues/2896) | | [`skills`](skills/) | SEP-2640 skills extension | not yet implemented — [#2896](https://github.com/modelcontextprotocol/python-sdk/issues/2896) | diff --git a/examples/stories/subscriptions/README.md b/examples/stories/subscriptions/README.md index b3f51b4b6..22b947ba3 100644 --- a/examples/stories/subscriptions/README.md +++ b/examples/stories/subscriptions/README.md @@ -10,7 +10,8 @@ per-stream filtering, subscription-id tagging). Replaces the handshake-era The client edits a note it did not subscribe to (silence), edits the one it did (a tagged `notifications/resources/updated`), registers a tool at runtime (`notifications/tools/list_changed`, then re-lists and calls it), and finally -closes the stream by cancelling the parked request. +stops listening - cancelling the parked request releases the local task, and +closing the connection ends the stream server-side. ## Run it @@ -27,9 +28,10 @@ uv run python -m stories.subscriptions.client --http --server server_lowlevel - `client.py` — stream frames arrive as ordinary server notifications via the constructor-only `message_handler=`. There is no client-side listen API yet, so opening the stream drops to the `client.session` escape hatch; the request - parks for the stream's lifetime and the client closes the stream by - cancelling it. Every frame's `_meta["io.modelcontextprotocol/subscriptionId"]` - is the listen request's JSON-RPC id. + parks for the stream's lifetime. Cancelling it releases the local task; over + HTTP the server-side stream ends when the connection closes. Every frame's + `_meta["io.modelcontextprotocol/subscriptionId"]` is the listen request's + JSON-RPC id. - `server.py` — publishing is one `await ctx.notify_*()` line per change; the filter, the tagging, and the ack ordering are the SDK's job. Publishing with no subscribers is a no-op. diff --git a/examples/stories/subscriptions/client.py b/examples/stories/subscriptions/client.py index d47f4a74d..379d69bc6 100644 --- a/examples/stories/subscriptions/client.py +++ b/examples/stories/subscriptions/client.py @@ -38,9 +38,11 @@ async def wait_for(count: int) -> None: async with anyio.create_task_group() as tg: # There is no client-side listen API yet, so the story drops to the - # `client.session` escape hatch: the request parks for the stream's - # lifetime, so it runs as a task and the client closes the stream by - # cancelling it (the spec's client-side close). + # `client.session` escape hatch. The request parks for the stream's + # lifetime, so it runs as a task; cancelling it releases the local + # awaiting scope. In-memory that also ends the server's stream; over + # HTTP today nothing aborts the POST, so the server-side stream ends + # when the connection closes (the `Client` exit right below). async def listen() -> None: request = types.SubscriptionsListenRequest( params=types.SubscriptionsListenRequestParams( @@ -69,7 +71,8 @@ async def listen() -> None: updated = received[1] assert isinstance(updated, types.ResourceUpdatedNotification), updated assert updated.params.uri == "note://todo" - assert updated.params.meta == ack.params.meta + assert updated.params.meta is not None + assert updated.params.meta[SUBSCRIPTION_ID] == ack.params.meta[SUBSCRIPTION_ID] assert len(received) == 2, "the journal edit must not have been delivered" # ── a runtime tool registration announces itself ── @@ -77,8 +80,8 @@ async def listen() -> None: await wait_for(3) assert isinstance(received[2], types.ToolListChangedNotification), received[2] - # The client is done listening: closing the stream is cancelling the - # parked request's scope. + # The client is done listening: cancel the parked request and let + # the connection teardown below end the stream server-side. tg.cancel_scope.cancel() # list_changed told us to re-fetch - the new tool is callable, and the diff --git a/examples/stories/subscriptions/server.py b/examples/stories/subscriptions/server.py index b102aea30..a248bf0ca 100644 --- a/examples/stories/subscriptions/server.py +++ b/examples/stories/subscriptions/server.py @@ -22,11 +22,16 @@ async def edit_note(name: str, text: str, ctx: Context) -> str: def search(query: str) -> list[str]: return [name for name, text in notes.items() if query in text] + enabled = False + @mcp.tool() async def enable_search(ctx: Context) -> str: """Register the `search` tool at runtime and tell subscribers the list changed.""" - mcp.add_tool(search) - await ctx.notify_tools_changed() + nonlocal enabled + if not enabled: + enabled = True + mcp.add_tool(search) + await ctx.notify_tools_changed() return "search is live" return mcp diff --git a/examples/stories/subscriptions/server_lowlevel.py b/examples/stories/subscriptions/server_lowlevel.py index 02365c1af..6d9da182d 100644 --- a/examples/stories/subscriptions/server_lowlevel.py +++ b/examples/stories/subscriptions/server_lowlevel.py @@ -56,7 +56,7 @@ async def call_tool(ctx: ServerRequestContext[Any], params: types.CallToolReques search_enabled = True await bus.publish(ToolsListChanged()) return types.CallToolResult(content=[types.TextContent(text="search is live")]) - assert params.name == "search" + assert params.name == "search" and search_enabled matches = [name for name, text in notes.items() if args["query"] in text] return types.CallToolResult(content=[types.TextContent(text=", ".join(matches))]) diff --git a/src/mcp/server/_streamable_http_modern.py b/src/mcp/server/_streamable_http_modern.py index a047892e6..148643e23 100644 --- a/src/mcp/server/_streamable_http_modern.py +++ b/src/mcp/server/_streamable_http_modern.py @@ -412,7 +412,10 @@ async def handle_modern_request( progress_token=progress_token_from_params(req.params), ) - if json_response: + if json_response and req.method != "subscriptions/listen": + # A listen response IS a notification stream, so it always takes the + # SSE path below regardless of the JSON-response preference (the + # TypeScript and Go SDKs route it the same way). msg = await _to_jsonrpc_response( req.id, serve_one(app, dctx, req.method, req.params, connection=connection, lifespan_state=lifespan_state) ) diff --git a/src/mcp/server/lowlevel/server.py b/src/mcp/server/lowlevel/server.py index 97b5557e2..66c497199 100644 --- a/src/mcp/server/lowlevel/server.py +++ b/src/mcp/server/lowlevel/server.py @@ -561,12 +561,22 @@ def get_capabilities( notification_options: NotificationOptions | None = None, experimental_capabilities: dict[str, dict[str, Any]] | None = None, extensions: dict[str, dict[str, Any]] | None = None, + *, + protocol_version: str | None = None, ) -> types.ServerCapabilities: """Convert existing handlers to a ServerCapabilities object. `extensions` is the SEP-2133 extension map (identifier -> settings) advertised under `ServerCapabilities.extensions`; it defaults to `self.extensions`. + + `protocol_version` makes the subscription-delivered bits era-honest: + at 2026-07-28+ versions, change notifications are delivered only on + `subscriptions/listen` streams, so the `listChanged` flags and + `resources.subscribe` derive from whether that method is served - + `notification_options` and the legacy `resources/subscribe` handler + (which the modern wire cannot dispatch) are ignored. When omitted, the + handshake-era derivation applies unchanged. """ notification_options = notification_options or NotificationOptions() prompts_capability = None @@ -575,20 +585,29 @@ def get_capabilities( logging_capability = None completions_capability = None + if protocol_version in MODERN_PROTOCOL_VERSIONS: + listen_served = "subscriptions/listen" in self._request_handlers + prompts_changed = tools_changed = resources_changed = subscribe = listen_served + else: + prompts_changed = notification_options.prompts_changed + tools_changed = notification_options.tools_changed + resources_changed = notification_options.resources_changed + subscribe = "resources/subscribe" in self._request_handlers + # Set prompt capabilities if handler exists if "prompts/list" in self._request_handlers: - prompts_capability = types.PromptsCapability(list_changed=notification_options.prompts_changed) + prompts_capability = types.PromptsCapability(list_changed=prompts_changed) # Set resource capabilities if handler exists if "resources/list" in self._request_handlers: resources_capability = types.ResourcesCapability( - subscribe="resources/subscribe" in self._request_handlers, - list_changed=notification_options.resources_changed, + subscribe=subscribe, + list_changed=resources_changed, ) # Set tool capabilities if handler exists if "tools/list" in self._request_handlers: - tools_capability = types.ToolsCapability(list_changed=notification_options.tools_changed) + tools_capability = types.ToolsCapability(list_changed=tools_changed) # Set logging capabilities if handler exists if "logging/setLevel" in self._request_handlers: @@ -638,7 +657,7 @@ async def _handle_discover( """ return types.DiscoverResult( supported_versions=list(MODERN_PROTOCOL_VERSIONS), - capabilities=self.get_capabilities(), + capabilities=self.get_capabilities(protocol_version=ctx.protocol_version), server_info=self.server_info, instructions=self.instructions, ) diff --git a/src/mcp/server/mcpserver/context.py b/src/mcp/server/mcpserver/context.py index 9a128be21..28d06761d 100644 --- a/src/mcp/server/mcpserver/context.py +++ b/src/mcp/server/mcpserver/context.py @@ -20,6 +20,7 @@ PromptsListChanged, ResourcesListChanged, ResourceUpdated, + SubscriptionBus, ToolsListChanged, ) from mcp.shared.exceptions import MCPDeprecationWarning @@ -65,6 +66,7 @@ async def my_tool(x: int, ctx: Context) -> str: _request_context: ServerRequestContext[LifespanContextT, RequestT] | None _mcp_server: MCPServer | None _input_params: InputResponseRequestParams | None + _subscriptions: SubscriptionBus | None # TODO(maxisbey): Consider making request_context/mcp_server required, or refactor Context entirely. def __init__( @@ -73,6 +75,7 @@ def __init__( request_context: ServerRequestContext[LifespanContextT, RequestT] | None = None, mcp_server: MCPServer | None = None, input_params: InputResponseRequestParams | None = None, + subscriptions: SubscriptionBus | None = None, # TODO(Marcelo): We should drop this kwargs parameter. **kwargs: Any, ): @@ -80,6 +83,7 @@ def __init__( self._request_context = request_context self._mcp_server = mcp_server self._input_params = input_params + self._subscriptions = subscriptions @property def mcp_server(self) -> MCPServer: @@ -103,7 +107,9 @@ def _nested_invocation(self) -> Context[LifespanContextT, RequestT]: request's own target — their keys are ones that handler minted — so a nested invocation always starts on round one. """ - return Context(request_context=self._request_context, mcp_server=self._mcp_server) + return Context( + request_context=self._request_context, mcp_server=self._mcp_server, subscriptions=self._subscriptions + ) async def report_progress(self, progress: float, total: float | None = None, message: str | None = None) -> None: """Report progress for the current operation. @@ -115,17 +121,23 @@ async def report_progress(self, progress: float, total: float | None = None, mes """ await self.request_context.session.report_progress(progress, total, message) + @property + def _bus(self) -> SubscriptionBus: + if self._subscriptions is None: + raise ValueError("Context is not available outside of a request") + return self._subscriptions + async def notify_tools_changed(self) -> None: """Publish a tools list-changed event to `subscriptions/listen` subscribers.""" - await self.mcp_server.subscriptions.publish(ToolsListChanged()) + await self._bus.publish(ToolsListChanged()) async def notify_prompts_changed(self) -> None: """Publish a prompts list-changed event to `subscriptions/listen` subscribers.""" - await self.mcp_server.subscriptions.publish(PromptsListChanged()) + await self._bus.publish(PromptsListChanged()) async def notify_resources_changed(self) -> None: """Publish a resources list-changed event to `subscriptions/listen` subscribers.""" - await self.mcp_server.subscriptions.publish(ResourcesListChanged()) + await self._bus.publish(ResourcesListChanged()) async def notify_resource_updated(self, uri: str | AnyUrl) -> None: """Publish a resource-updated event for `uri` to `subscriptions/listen` subscribers. @@ -135,7 +147,7 @@ async def notify_resource_updated(self, uri: str | AnyUrl) -> None: protocol versions that used `resources/subscribe` are notified via `ctx.session.send_resource_updated(uri)` instead. """ - await self.mcp_server.subscriptions.publish(ResourceUpdated(uri=str(uri))) + await self._bus.publish(ResourceUpdated(uri=str(uri))) async def read_resource(self, uri: str | AnyUrl) -> Iterable[ReadResourceContents]: """Read a resource by URI. diff --git a/src/mcp/server/mcpserver/prompts/manager.py b/src/mcp/server/mcpserver/prompts/manager.py index 7e7f35078..01a0823bf 100644 --- a/src/mcp/server/mcpserver/prompts/manager.py +++ b/src/mcp/server/mcpserver/prompts/manager.py @@ -47,6 +47,12 @@ def add_prompt( self._prompts[prompt.name] = prompt return prompt + def remove_prompt(self, name: str) -> None: + """Remove a prompt by name.""" + if name not in self._prompts: + raise ValueError(f"Unknown prompt: {name}") + del self._prompts[name] + async def render_prompt( self, name: str, diff --git a/src/mcp/server/mcpserver/server.py b/src/mcp/server/mcpserver/server.py index 0dcfc4af2..ff36cf593 100644 --- a/src/mcp/server/mcpserver/server.py +++ b/src/mcp/server/mcpserver/server.py @@ -292,16 +292,6 @@ def icons(self) -> list[Icon] | None: def version(self) -> str | None: return self._lowlevel_server.version - @property - def subscriptions(self) -> SubscriptionBus: - """The `subscriptions/listen` event bus. - - Publish a `ServerEvent` here (or via the `Context.notify_*` methods) - to deliver it to subscribed clients. The bus passed to the constructor, - or the in-process default. - """ - return self._subscriptions - @property def session_manager(self) -> StreamableHTTPSessionManager: """Get the StreamableHTTP session manager. @@ -413,7 +403,7 @@ async def _handle_list_tools( async def _handle_call_tool( self, ctx: ServerRequestContext[LifespanResultT], params: CallToolRequestParams ) -> CallToolResult | InputRequiredResult: - context = Context(request_context=ctx, mcp_server=self, input_params=params) + context = Context(request_context=ctx, mcp_server=self, input_params=params, subscriptions=self._subscriptions) try: return await self.call_tool(params.name, params.arguments or {}, context) except MCPError: @@ -429,7 +419,7 @@ async def _handle_list_resources( async def _handle_read_resource( self, ctx: ServerRequestContext[LifespanResultT], params: ReadResourceRequestParams ) -> ReadResourceResult | InputRequiredResult: - context = Context(request_context=ctx, mcp_server=self, input_params=params) + context = Context(request_context=ctx, mcp_server=self, input_params=params, subscriptions=self._subscriptions) try: results = await self.read_resource(params.uri, context) except ResourceNotFoundError as err: @@ -473,7 +463,7 @@ async def _handle_list_prompts( async def _handle_get_prompt( self, ctx: ServerRequestContext[LifespanResultT], params: GetPromptRequestParams ) -> GetPromptResult | InputRequiredResult: - context = Context(request_context=ctx, mcp_server=self, input_params=params) + context = Context(request_context=ctx, mcp_server=self, input_params=params, subscriptions=self._subscriptions) return await self.get_prompt(params.name, params.arguments, context) async def list_tools(self) -> list[MCPTool]: @@ -896,6 +886,17 @@ def add_prompt(self, prompt: Prompt) -> None: """ self._prompt_manager.add_prompt(prompt) + def remove_prompt(self, name: str) -> None: + """Remove a prompt from the server by name. + + Args: + name: The name of the prompt to remove + + Raises: + ValueError: If the prompt does not exist + """ + self._prompt_manager.remove_prompt(name) + def prompt( self, name: str | None = None, diff --git a/src/mcp/server/subscriptions.py b/src/mcp/server/subscriptions.py index 65fe0a14c..da6bab8cf 100644 --- a/src/mcp/server/subscriptions.py +++ b/src/mcp/server/subscriptions.py @@ -22,7 +22,7 @@ from __future__ import annotations -import math +import logging from collections.abc import Callable from dataclasses import dataclass from typing import Any, Protocol @@ -30,6 +30,7 @@ import anyio import anyio.streams.memory from mcp_types import ( + INTERNAL_ERROR, INVALID_REQUEST, NotificationParams, PromptListChangedNotification, @@ -48,6 +49,8 @@ from mcp.server.context import ServerRequestContext from mcp.shared.exceptions import MCPError +logger = logging.getLogger(__name__) + SUBSCRIPTION_ID_META_KEY = "io.modelcontextprotocol/subscriptionId" """The `_meta` key carrying the subscription id on every listen-stream frame. @@ -112,9 +115,16 @@ def __init__(self) -> None: self._listeners: dict[object, Callable[[ServerEvent], None]] = {} async def publish(self, event: ServerEvent) -> None: - """Deliver `event` to every subscribed listener.""" + """Deliver `event` to every subscribed listener. + + A raising listener is logged and skipped: one bad listener must not + starve the others or fail the publishing handler. + """ for listener in list(self._listeners.values()): - listener(event) + try: + listener(event) + except Exception: # fan-out boundary: isolate listeners from each other + logger.exception("subscription listener raised; continuing") def subscribe(self, listener: Callable[[ServerEvent], None]) -> Callable[[], None]: """Register `listener` and return an idempotent unsubscribe callable.""" @@ -143,15 +153,19 @@ def _honored_subset(requested: SubscriptionFilter) -> SubscriptionFilter: ) -def _event_matches(honored: SubscriptionFilter, event: ServerEvent) -> bool: - """Whether `event` is within the stream's honored filter.""" +def _event_matches(honored: SubscriptionFilter, uris: frozenset[str], event: ServerEvent) -> bool: + """Whether `event` is within the stream's honored filter. + + `uris` is the honored `resource_subscriptions` as a set: matching runs on + every publish, and the wire filter may name many URIs. + """ if isinstance(event, ToolsListChanged): return honored.tools_list_changed is True if isinstance(event, PromptsListChanged): return honored.prompts_list_changed is True if isinstance(event, ResourcesListChanged): return honored.resources_list_changed is True - return honored.resource_subscriptions is not None and event.uri in honored.resource_subscriptions + return event.uri in uris def _event_to_notification(event: ServerEvent, meta: dict[str, Any]) -> ServerNotification: @@ -177,10 +191,19 @@ class ListenHandler: Requires a transport that can stream a request's response (streamable HTTP's SSE mode). + + `max_subscriptions` bounds concurrent streams (further listen requests are + rejected with `INTERNAL_ERROR`, before the ack). `max_buffered_events` + bounds each stream's event backlog: a stream whose client has stopped + reading is ended at the cap (the client re-listens and refetches - there + is no replay, so ending the stream loses nothing the backlog wasn't + already losing). """ - def __init__(self, bus: SubscriptionBus) -> None: + def __init__(self, bus: SubscriptionBus, *, max_subscriptions: int = 1024, max_buffered_events: int = 1024) -> None: self._bus = bus + self._max_subscriptions = max_subscriptions + self._max_buffered_events = max_buffered_events self._streams: set[anyio.streams.memory.MemoryObjectSendStream[ServerEvent]] = set() async def __call__( @@ -192,20 +215,27 @@ async def __call__( subscription_id = ctx.request_id if subscription_id is None: raise MCPError(INVALID_REQUEST, "subscriptions/listen requires a request id") + if len(self._streams) >= self._max_subscriptions: + raise MCPError(INTERNAL_ERROR, "Subscription limit reached") honored = _honored_subset(params.notifications) + honored_uris = frozenset(honored.resource_subscriptions or ()) meta: dict[str, Any] = {SUBSCRIPTION_ID_META_KEY: subscription_id} - # Unbounded buffer so publishers never block on a slow consumer (the - # transport write happens in this handler task, not the publisher's). - send, recv = anyio.create_memory_object_stream[ServerEvent](math.inf) + # Buffered so publishers don't block on a slow consumer (the transport + # write happens in this handler task, not the publisher's). A stream + # whose backlog hits the cap is ended - see the class docstring. + send, recv = anyio.create_memory_object_stream[ServerEvent](self._max_buffered_events) def deliver(event: ServerEvent) -> None: - if _event_matches(honored, event): + if _event_matches(honored, honored_uris, event): try: send.send_nowait(event) except anyio.ClosedResourceError: # `close` closed this stream; the loop below is unwinding. pass + except anyio.WouldBlock: + logger.warning("listen stream %r backlog full; ending the stream", subscription_id) + send.close() # Subscribe before sending the ack so an event published while the # ack write is suspended is buffered rather than lost. The ack is @@ -232,11 +262,13 @@ def deliver(event: ServerEvent) -> None: return SubscriptionsListenResult(_meta=meta) def close(self) -> None: - """Gracefully end every open listen stream. + """Initiate graceful closure of every open listen stream. - Each stream sends its `SubscriptionsListenResult` (stamped with the - subscription id) as the final frame and closes - the spec's graceful - closure flow, signalling clients not to re-listen. + Each stream then drains its buffered events and sends its + `SubscriptionsListenResult` (stamped with the subscription id) as the + final frame from its own handler task - the spec's graceful closure + flow, signalling clients not to re-listen. This method only initiates + that; it does not wait for the streams to finish flushing. """ for stream in list(self._streams): stream.close() diff --git a/tests/docs_src/test_client.py b/tests/docs_src/test_client.py index 97cc327dc..af5e69249 100644 --- a/tests/docs_src/test_client.py +++ b/tests/docs_src/test_client.py @@ -121,11 +121,12 @@ async def test_read_resource_fills_in_a_template() -> None: assert contents.text == "3 books filed under poetry." -async def test_mcpserver_does_not_implement_resource_subscriptions() -> None: - """The Resources section: MCPServer advertises subscribe=False and rejects subscribe_resource with -32601.""" +async def test_resource_subscriptions_are_listen_based_on_the_modern_wire() -> None: + """The Resources section: at 2026-07-28 `resources.subscribe` is True (served via + subscriptions/listen) while the legacy subscribe_resource verb answers -32601.""" async with Client(tutorial004.mcp) as client: assert client.server_capabilities.resources is not None - assert client.server_capabilities.resources.subscribe is False + assert client.server_capabilities.resources.subscribe is True with pytest.raises(MCPError) as exc_info: await client.subscribe_resource("catalog://genres") assert exc_info.value.error.code == -32601 diff --git a/tests/docs_src/test_first_steps.py b/tests/docs_src/test_first_steps.py index 15d6708ee..2b1674a47 100644 --- a/tests/docs_src/test_first_steps.py +++ b/tests/docs_src/test_first_steps.py @@ -89,9 +89,9 @@ async def test_the_three_primitive_capabilities_are_always_declared() -> None: # The exact dictionary the page prints from `model_dump(exclude_none=True)`. assert declared.model_dump(exclude_none=True) == snapshot( { - "prompts": {"list_changed": False}, - "resources": {"subscribe": False, "list_changed": False}, - "tools": {"list_changed": False}, + "prompts": {"list_changed": True}, + "resources": {"subscribe": True, "list_changed": True}, + "tools": {"list_changed": True}, } ) async with Client(MCPServer("Empty")) as client: diff --git a/tests/server/lowlevel/test_server_discover.py b/tests/server/lowlevel/test_server_discover.py index 1d036c2e7..2886d90c1 100644 --- a/tests/server/lowlevel/test_server_discover.py +++ b/tests/server/lowlevel/test_server_discover.py @@ -14,16 +14,24 @@ from mcp.server import Server, ServerRequestContext -# `Server._handle_discover` ignores its `ctx` argument entirely (it derives the -# result from server state), so a sentinel keeps the call site type-correct -# without dragging session machinery into a unit test. -_UNUSED_CTX = cast("ServerRequestContext[Any]", None) + +# `Server._handle_discover` reads only `ctx.protocol_version` (capabilities are +# era-dependent), so a minimal context keeps the call site honest without +# dragging session machinery into a unit test. +def _ctx(protocol_version: str) -> ServerRequestContext[Any]: + return ServerRequestContext( + session=cast("Any", None), + lifespan_context={}, + protocol_version=protocol_version, + method="server/discover", + request_id=1, + ) -async def _discover(server: Server[Any]) -> types.DiscoverResult: +async def _discover(server: Server[Any], protocol_version: str = MODERN_PROTOCOL_VERSIONS[0]) -> types.DiscoverResult: entry = server.get_request_handler("server/discover") assert entry is not None - result = await entry.handler(_UNUSED_CTX, types.RequestParams()) + result = await entry.handler(_ctx(protocol_version), types.RequestParams()) assert isinstance(result, types.DiscoverResult) return result @@ -149,3 +157,66 @@ async def custom_discover( server.add_request_handler("server/discover", types.RequestParams, custom_discover) result = await _discover(server) assert result is custom + + +async def _listen_stub( + ctx: ServerRequestContext[Any], params: types.SubscriptionsListenRequestParams +) -> types.SubscriptionsListenResult: + raise NotImplementedError + + +@pytest.mark.anyio +async def test_modern_subscription_bits_derive_from_listen_serving() -> None: + """Spec-driven (SEP-2575): at 2026-07-28, change notifications exist only on + `subscriptions/listen` streams, so the `listChanged`/`subscribe` bits mean + "this server serves listen" - they flip together with the handler.""" + + async def list_tools( + ctx: ServerRequestContext[Any], params: types.PaginatedRequestParams | None + ) -> types.ListToolsResult: + raise NotImplementedError + + async def list_resources( + ctx: ServerRequestContext[Any], params: types.PaginatedRequestParams | None + ) -> types.ListResourcesResult: + raise NotImplementedError + + server = Server("caps", on_list_tools=list_tools, on_list_resources=list_resources) + + before = await _discover(server) + assert before.capabilities.tools is not None and before.capabilities.tools.list_changed is False + assert before.capabilities.resources is not None + assert before.capabilities.resources.subscribe is False + assert before.capabilities.resources.list_changed is False + + server.add_request_handler("subscriptions/listen", types.SubscriptionsListenRequestParams, _listen_stub) + + after = await _discover(server) + assert after.capabilities.tools is not None and after.capabilities.tools.list_changed is True + assert after.capabilities.resources is not None + assert after.capabilities.resources.subscribe is True + assert after.capabilities.resources.list_changed is True + + +@pytest.mark.anyio +async def test_legacy_capability_derivation_ignores_listen() -> None: + """SDK-defined: without `protocol_version`, `get_capabilities` keeps the + handshake-era derivation - `NotificationOptions` drives `listChanged` and the + `resources/subscribe` handler drives `subscribe`; a registered listen handler + changes nothing on that path.""" + + async def list_tools( + ctx: ServerRequestContext[Any], params: types.PaginatedRequestParams | None + ) -> types.ListToolsResult: + raise NotImplementedError + + server = Server("caps", on_list_tools=list_tools) + server.add_request_handler("subscriptions/listen", types.SubscriptionsListenRequestParams, _listen_stub) + + legacy = server.get_capabilities() + assert legacy.tools is not None and legacy.tools.list_changed is False + + from mcp.server import NotificationOptions + + opted_in = server.get_capabilities(NotificationOptions(tools_changed=True)) + assert opted_in.tools is not None and opted_in.tools.list_changed is True diff --git a/tests/server/mcpserver/test_server.py b/tests/server/mcpserver/test_server.py index 9d04cfd25..b99321575 100644 --- a/tests/server/mcpserver/test_server.py +++ b/tests/server/mcpserver/test_server.py @@ -2258,16 +2258,11 @@ async def probe(ctx: Context) -> str: assert captured == {"responses": None, "state": None} -def test_subscriptions_bus_defaults_to_in_memory_and_accepts_custom() -> None: - assert isinstance(MCPServer().subscriptions, InMemorySubscriptionBus) +async def test_context_notify_methods_publish_to_the_configured_bus() -> None: bus = InMemorySubscriptionBus() - assert MCPServer(subscriptions=bus).subscriptions is bus - - -async def test_context_notify_methods_publish_to_the_subscriptions_bus() -> None: - mcp = MCPServer() + mcp = MCPServer(subscriptions=bus) seen: list[ServerEvent] = [] - mcp.subscriptions.subscribe(seen.append) + bus.subscribe(seen.append) @mcp.tool() async def touch(ctx: Context) -> str: @@ -2287,3 +2282,27 @@ async def touch(ctx: Context) -> str: def test_context_mcp_server_outside_request_raises() -> None: with pytest.raises(ValueError, match="outside of a request"): _ = Context().mcp_server + + +async def test_context_notify_outside_a_request_raises() -> None: + with pytest.raises(ValueError, match="outside of a request"): + await Context().notify_tools_changed() + + +def test_context_exposes_its_mcp_server() -> None: + mcp = MCPServer() + assert Context(mcp_server=mcp).mcp_server is mcp + + +def test_remove_prompt_removes_and_unknown_name_raises() -> None: + mcp = MCPServer() + + @mcp.prompt() + def greeting() -> str: # pragma: no cover + return "hello" + + assert len(mcp._prompt_manager.list_prompts()) == 1 + mcp.remove_prompt("greeting") + assert mcp._prompt_manager.list_prompts() == [] + with pytest.raises(ValueError, match="Unknown prompt: greeting"): + mcp.remove_prompt("greeting") diff --git a/tests/server/test_streamable_http_modern.py b/tests/server/test_streamable_http_modern.py index 17ea9eb43..766d654a4 100644 --- a/tests/server/test_streamable_http_modern.py +++ b/tests/server/test_streamable_http_modern.py @@ -8,6 +8,7 @@ import json import logging +from collections.abc import Callable from typing import Any import anyio @@ -44,6 +45,7 @@ _to_jsonrpc_response, handle_modern_request, ) +from mcp.server.subscriptions import InMemorySubscriptionBus, ListenHandler, ServerEvent from mcp.server.transport_security import TransportSecuritySettings from mcp.shared.exceptions import MCPError, NoBackChannelError from mcp.shared.inbound import MCP_METHOD_HEADER, MCP_NAME_HEADER, MCP_PROTOCOL_VERSION_HEADER @@ -1008,3 +1010,60 @@ async def test_modern_post_with_deeply_nested_body_is_parse_error_not_a_crash() response = await http.post("/mcp", content=body, headers={"content-type": "application/json"}) assert response.status_code == 400 assert response.json()["error"]["code"] == PARSE_ERROR + + +class _OpenSignalBus(InMemorySubscriptionBus): + """Sets an event when a listen stream subscribes, so tests can sequence close().""" + + def __init__(self) -> None: + super().__init__() + self.opened = anyio.Event() + + def subscribe(self, listener: Callable[[ServerEvent], None]) -> Callable[[], None]: + unsubscribe = super().subscribe(listener) + self.opened.set() + return unsubscribe + + +async def test_json_response_mode_still_streams_subscriptions_listen() -> None: + """SDK-defined (TypeScript/Go parity): a listen response IS a notification + stream, so `json_response=True` does not apply to it - the request takes + the SSE path, acks first, and ends with the stamped result on close().""" + bus = _OpenSignalBus() + handler = ListenHandler(bus) + server = Server("test", on_subscriptions_listen=handler) + body = { + "jsonrpc": "2.0", + "id": 9, + "method": "subscriptions/listen", + "params": { + "notifications": {"toolsListChanged": True}, + "_meta": { + PROTOCOL_VERSION_META_KEY: LATEST_MODERN_VERSION, + CLIENT_INFO_META_KEY: {"name": "raw", "version": "0.0.0"}, + CLIENT_CAPABILITIES_META_KEY: {}, + }, + }, + } + + responses: list[httpx.Response] = [] + async with _asgi_client(server, json_response=True) as http: + async with anyio.create_task_group() as tg: + + async def post() -> None: + responses.append( + await http.post("/mcp", json=body, headers={MCP_METHOD_HEADER: "subscriptions/listen"}) + ) + + tg.start_soon(post) + with anyio.fail_after(5): + await bus.opened.wait() + handler.close() + + response = responses[0] + assert response.status_code == 200 + assert response.headers["content-type"].split(";", 1)[0] == "text/event-stream" + events = _sse_payloads(response.text) + assert events[0]["method"] == "notifications/subscriptions/acknowledged" + assert events[1]["id"] == 9 + assert events[1]["result"]["_meta"] == {"io.modelcontextprotocol/subscriptionId": 9} diff --git a/tests/server/test_subscriptions.py b/tests/server/test_subscriptions.py index 3acb298c3..9f987057e 100644 --- a/tests/server/test_subscriptions.py +++ b/tests/server/test_subscriptions.py @@ -295,3 +295,71 @@ async def test_listen_requires_a_request_id() -> None: def test_close_without_open_streams_is_a_no_op() -> None: """SDK-defined: `close()` with nothing open does nothing.""" ListenHandler(InMemorySubscriptionBus()).close() + + +@pytest.mark.anyio +async def test_raising_listener_is_isolated_from_others() -> None: + """SDK-defined: one raising listener is logged and skipped; later listeners + and the publishing handler are unaffected.""" + bus = InMemorySubscriptionBus() + + def bad(event: ServerEvent) -> None: + raise RuntimeError("boom") + + seen: list[ServerEvent] = [] + bus.subscribe(bad) + bus.subscribe(seen.append) + + await bus.publish(ToolsListChanged()) + assert seen == [ToolsListChanged()] + + +@pytest.mark.anyio +async def test_subscription_limit_rejects_further_streams_pre_ack() -> None: + """SDK-defined cap (mirrors the TypeScript SDK): past `max_subscriptions`, + a listen request is rejected before any ack frame.""" + handler = ListenHandler(InMemorySubscriptionBus(), max_subscriptions=1) + session = _RecordingSession() + + async with anyio.create_task_group() as tg: + + async def run() -> None: + await handler(_ctx(session), _params(tools_list_changed=True)) + + tg.start_soon(run) + await session.wait_for(1) + + rejected_session = _RecordingSession() + with pytest.raises(MCPError) as exc_info: + await handler(_ctx(rejected_session, request_id=8), _params()) + assert exc_info.value.error.message == "Subscription limit reached" + assert rejected_session.sent == [] + + handler.close() + + +@pytest.mark.anyio +async def test_backlog_overflow_ends_the_stream() -> None: + """SDK-defined: a stream whose client stopped reading is ended at + `max_buffered_events` rather than buffering forever; the client re-listens.""" + bus = InMemorySubscriptionBus() + handler = ListenHandler(bus, max_buffered_events=1) + session = _RecordingSession() + results: list[SubscriptionsListenResult] = [] + + async with anyio.create_task_group() as tg: + + async def run() -> None: + results.append(await handler(_ctx(session), _params(tools_list_changed=True))) + + tg.start_soon(run) + await session.wait_for(1) + + # Two publishes before the handler task resumes: the first fills the + # one-slot buffer, the second overflows and ends the stream. + await bus.publish(ToolsListChanged()) + await bus.publish(ToolsListChanged()) + + delivered = [notification for notification, _ in session.sent[1:]] + assert len(delivered) == 1 # the buffered event still drained + assert results[0].meta is not None # the stream ended with the stamped result From a6b9f03bb75876905099622d83a4d3d53ec374c8 Mon Sep 17 00:00:00 2001 From: Max Isbey <224885523+maxisbey@users.noreply.github.com> Date: Tue, 30 Jun 2026 20:51:49 +0000 Subject: [PATCH 5/7] Document that subscription filters carry no per-client authorization Any client may subscribe to any URI, including one it cannot read, and will receive update notifications for it (resource existence and change timing - never content). Multi-tenant servers should not publish sensitive per-user URIs, or should serve the method with their own handler and narrow the filter before acking; a narrowing hook on the built-in handler is a candidate follow-up. --- docs/advanced/subscriptions.md | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/docs/advanced/subscriptions.md b/docs/advanced/subscriptions.md index 7364d92e2..da68bdc40 100644 --- a/docs/advanced/subscriptions.md +++ b/docs/advanced/subscriptions.md @@ -33,6 +33,14 @@ The SDK serves `subscriptions/listen` for you — `MCPServer` registers the hand The filter is a contract. A stream that requested tool-list changes and one resource URI receives those two kinds and nothing else — publish a prompt change and that stream stays silent. Resource URIs are matched as exact strings: `note://todo` does not cover `note://todo/draft`. +!!! warning + Filters are honored without per-client authorization: any client may name any URI — + including one it cannot read — and will receive update notifications for it (resource + existence and change timing, never content). On a multi-tenant server, don't publish + sensitive per-user URIs through `notify_resource_updated`, or serve the method with + your own handler on the low-level `Server` and narrow the filter there before acking — + the honored subset exists in the protocol precisely so servers can do this. + Two more things the stream is *not*: * **It is not a replay log.** A dropped stream is gone; events published while nobody was connected are not queued. The client's contract is to re-listen and re-fetch what it cares about. From 05fb2b69628fb21d4ff41a4e1c1f6d5182499ef2 Mon Sep 17 00:00:00 2001 From: Max Isbey <224885523+maxisbey@users.noreply.github.com> Date: Tue, 30 Jun 2026 21:31:11 +0000 Subject: [PATCH 6/7] Apply review fixes to subscriptions/listen serving - Pass the server-scoped subscription bus into the fallback Contexts built by programmatic call_tool/read_resource/get_prompt, so ctx.notify_* works there instead of raising. - Reject subscriptions/listen with 406 when the Accept header lacks text/event-stream: the response is always an SSE stream, so JSON-response mode must not serve a content type the client never accepted. - Release a stream's subscription slot at backlog overflow time; the stream's own cleanup can be wedged in a transport write that closing the buffer cannot wake. - End InMemorySubscriptionBus.publish with a checkpoint so a same-task publish burst lets listen streams drain between events instead of overflowing a healthy stream's buffer. - Describe the graceful-close result as a deliberate end rather than a "don't re-listen" signal: a stream ended at the overflow cap sends the same result and the client should re-listen. - Move a function-body import to the top of the file and fix a stale comment referencing a removed property. --- docs/advanced/subscriptions.md | 2 +- src/mcp/server/_streamable_http_modern.py | 7 ++ src/mcp/server/mcpserver/server.py | 6 +- src/mcp/server/subscriptions.py | 15 +++- tests/docs_src/test_subscriptions.py | 2 +- tests/server/lowlevel/test_server_discover.py | 4 +- tests/server/mcpserver/test_server.py | 31 +++++++ tests/server/test_streamable_http_modern.py | 31 +++++-- tests/server/test_subscriptions.py | 81 ++++++++++++++++--- 9 files changed, 150 insertions(+), 29 deletions(-) diff --git a/docs/advanced/subscriptions.md b/docs/advanced/subscriptions.md index da68bdc40..46014ef77 100644 --- a/docs/advanced/subscriptions.md +++ b/docs/advanced/subscriptions.md @@ -77,7 +77,7 @@ Down on the low-level `Server` there is no pre-wired anything — and the same p * You own the bus, so you publish to it directly: `await bus.publish(ResourceUpdated(uri=...))`. Put it wherever your handlers can reach it — module scope here, the lifespan in a bigger app. * `ListenHandler(bus)` is the same handler `MCPServer` registers; `on_subscriptions_listen=` is an ordinary handler slot. Don't want the SDK's semantics? Write your own handler for the slot — the spec obligations come with it. -* `ListenHandler.close()` gracefully ends every open stream: each one receives the listen request's result as its final frame, the spec's signal that the server ended the subscription deliberately and the client shouldn't re-listen. Without it, streams end when the client disconnects. +* `ListenHandler.close()` gracefully ends every open stream: each one receives the listen request's result as its final frame, the spec's signal that the server ended the subscription deliberately — a clean end, as opposed to the abrupt drop a client may treat as a cue to reconnect. Without it, streams end when the client disconnects. ## Recap diff --git a/src/mcp/server/_streamable_http_modern.py b/src/mcp/server/_streamable_http_modern.py index 148643e23..52a9a7017 100644 --- a/src/mcp/server/_streamable_http_modern.py +++ b/src/mcp/server/_streamable_http_modern.py @@ -383,6 +383,13 @@ async def handle_modern_request( await _write(rej, scope, receive, send) return + if req.method == "subscriptions/listen" and not has_sse: + # A listen response IS a notification stream, never JSON (the + # json_response carve-out below), so this one method requires the + # SSE accept even in JSON-response mode; SSE mode gated it above. + await Response(status_code=406)(scope, receive, send) + return + duplicated = find_duplicated_routing_header(request.headers.items()) if duplicated is not None: # The raw carrier is the only place duplicates are visible; the classifier sees a folded mapping. diff --git a/src/mcp/server/mcpserver/server.py b/src/mcp/server/mcpserver/server.py index ff36cf593..d933e82d5 100644 --- a/src/mcp/server/mcpserver/server.py +++ b/src/mcp/server/mcpserver/server.py @@ -488,7 +488,7 @@ async def call_tool( ) -> CallToolResult | InputRequiredResult: """Call a tool by name with arguments.""" if context is None: - context = Context(mcp_server=self) + context = Context(mcp_server=self, subscriptions=self._subscriptions) return await self._tool_manager.call_tool(name, arguments, context, convert_result=True) async def list_resources(self) -> list[MCPResource]: @@ -540,7 +540,7 @@ async def read_resource( ResourceError: If template creation or resource reading fails. """ if context is None: - context = Context(mcp_server=self) + context = Context(mcp_server=self, subscriptions=self._subscriptions) resource = await self._resource_manager.get_resource(uri, context) if isinstance(resource, InputRequiredResult): return resource @@ -1260,7 +1260,7 @@ async def get_prompt( carrying the echoed opaque state. """ if context is None: - context = Context(mcp_server=self) + context = Context(mcp_server=self, subscriptions=self._subscriptions) try: prompt = self._prompt_manager.get_prompt(name) if not prompt: diff --git a/src/mcp/server/subscriptions.py b/src/mcp/server/subscriptions.py index da6bab8cf..a1c69690f 100644 --- a/src/mcp/server/subscriptions.py +++ b/src/mcp/server/subscriptions.py @@ -28,6 +28,7 @@ from typing import Any, Protocol import anyio +import anyio.lowlevel import anyio.streams.memory from mcp_types import ( INTERNAL_ERROR, @@ -118,13 +119,16 @@ async def publish(self, event: ServerEvent) -> None: """Deliver `event` to every subscribed listener. A raising listener is logged and skipped: one bad listener must not - starve the others or fail the publishing handler. + starve the others or fail the publishing handler. Ends with a + checkpoint so a burst of publishes from one task lets listen streams + drain between events instead of overflowing their buffers unread. """ for listener in list(self._listeners.values()): try: listener(event) except Exception: # fan-out boundary: isolate listeners from each other logger.exception("subscription listener raised; continuing") + await anyio.lowlevel.checkpoint() def subscribe(self, listener: Callable[[ServerEvent], None]) -> Callable[[], None]: """Register `listener` and return an idempotent unsubscribe callable.""" @@ -235,6 +239,10 @@ def deliver(event: ServerEvent) -> None: pass except anyio.WouldBlock: logger.warning("listen stream %r backlog full; ending the stream", subscription_id) + # Release the subscription slot now: the handler's own + # cleanup can be wedged in a transport write that closing + # this buffer cannot wake (a client that stopped reading). + self._streams.discard(send) send.close() # Subscribe before sending the ack so an event published while the @@ -267,8 +275,9 @@ def close(self) -> None: Each stream then drains its buffered events and sends its `SubscriptionsListenResult` (stamped with the subscription id) as the final frame from its own handler task - the spec's graceful closure - flow, signalling clients not to re-listen. This method only initiates - that; it does not wait for the streams to finish flushing. + flow, telling clients the stream ended deliberately rather than + dropping. This method only initiates that; it does not wait for the + streams to finish flushing. """ for stream in list(self._streams): stream.close() diff --git a/tests/docs_src/test_subscriptions.py b/tests/docs_src/test_subscriptions.py index 5ab155d0d..cdfe1d935 100644 --- a/tests/docs_src/test_subscriptions.py +++ b/tests/docs_src/test_subscriptions.py @@ -128,7 +128,7 @@ async def listen() -> None: assert isinstance(updated, types.ResourceUpdatedNotification) assert updated.params.uri == "note://todo" - # `mcp.subscriptions` / the bus is also the publish surface outside a + # The bus you constructed is also the publish surface outside a # request; an unrequested kind never reaches this stream. await tutorial002.bus.publish(ToolsListChanged()) await client.call_tool("edit_note", {"name": "todo", "text": "done"}) diff --git a/tests/server/lowlevel/test_server_discover.py b/tests/server/lowlevel/test_server_discover.py index 2886d90c1..05d57d846 100644 --- a/tests/server/lowlevel/test_server_discover.py +++ b/tests/server/lowlevel/test_server_discover.py @@ -12,7 +12,7 @@ import pytest from mcp_types.version import MODERN_PROTOCOL_VERSIONS -from mcp.server import Server, ServerRequestContext +from mcp.server import NotificationOptions, Server, ServerRequestContext # `Server._handle_discover` reads only `ctx.protocol_version` (capabilities are @@ -216,7 +216,5 @@ async def list_tools( legacy = server.get_capabilities() assert legacy.tools is not None and legacy.tools.list_changed is False - from mcp.server import NotificationOptions - opted_in = server.get_capabilities(NotificationOptions(tools_changed=True)) assert opted_in.tools is not None and opted_in.tools.list_changed is True diff --git a/tests/server/mcpserver/test_server.py b/tests/server/mcpserver/test_server.py index b99321575..3103a50f3 100644 --- a/tests/server/mcpserver/test_server.py +++ b/tests/server/mcpserver/test_server.py @@ -2279,6 +2279,37 @@ async def touch(ctx: Context) -> str: assert seen == [ToolsListChanged(), PromptsListChanged(), ResourcesListChanged(), ResourceUpdated(uri="r://x")] +async def test_programmatic_entry_points_carry_the_subscription_bus() -> None: + """`ctx.notify_*` works when tools, resources, and prompts are invoked + programmatically (no wire request): the server-scoped bus rides along in + the fallback Context.""" + bus = InMemorySubscriptionBus() + mcp = MCPServer(subscriptions=bus) + seen: list[ServerEvent] = [] + bus.subscribe(seen.append) + + @mcp.tool() + async def touch_tools(ctx: Context) -> str: + await ctx.notify_tools_changed() + return "ok" + + @mcp.resource("res://{name}") + async def thing(name: str, ctx: Context) -> str: + await ctx.notify_resources_changed() + return "data" + + @mcp.prompt() + async def ask(ctx: Context) -> str: + await ctx.notify_prompts_changed() + return "question" + + await mcp.call_tool("touch_tools", {}) + await mcp.read_resource("res://thing") + await mcp.get_prompt("ask") + + assert seen == [ToolsListChanged(), ResourcesListChanged(), PromptsListChanged()] + + def test_context_mcp_server_outside_request_raises() -> None: with pytest.raises(ValueError, match="outside of a request"): _ = Context().mcp_server diff --git a/tests/server/test_streamable_http_modern.py b/tests/server/test_streamable_http_modern.py index 766d654a4..19ad33f19 100644 --- a/tests/server/test_streamable_http_modern.py +++ b/tests/server/test_streamable_http_modern.py @@ -1025,14 +1025,9 @@ def subscribe(self, listener: Callable[[ServerEvent], None]) -> Callable[[], Non return unsubscribe -async def test_json_response_mode_still_streams_subscriptions_listen() -> None: - """SDK-defined (TypeScript/Go parity): a listen response IS a notification - stream, so `json_response=True` does not apply to it - the request takes - the SSE path, acks first, and ends with the stamped result on close().""" - bus = _OpenSignalBus() - handler = ListenHandler(bus) - server = Server("test", on_subscriptions_listen=handler) - body = { +def _listen_body() -> dict[str, Any]: + """A minimal valid 2026-07-28 `subscriptions/listen` request body.""" + return { "jsonrpc": "2.0", "id": 9, "method": "subscriptions/listen", @@ -1046,6 +1041,26 @@ async def test_json_response_mode_still_streams_subscriptions_listen() -> None: }, } + +async def test_subscriptions_listen_requires_the_sse_accept_even_in_json_mode() -> None: + """SDK-defined: a listen response is always SSE, so a request whose Accept + lacks `text/event-stream` is rejected with 406 rather than served a content + type it never accepted - JSON-response mode included.""" + server = Server("test", on_subscriptions_listen=ListenHandler(InMemorySubscriptionBus())) + async with _asgi_client(server, json_response=True, accept="application/json") as http: + response = await http.post("/mcp", json=_listen_body(), headers={MCP_METHOD_HEADER: "subscriptions/listen"}) + assert response.status_code == 406 + + +async def test_json_response_mode_still_streams_subscriptions_listen() -> None: + """SDK-defined (TypeScript/Go parity): a listen response IS a notification + stream, so `json_response=True` does not apply to it - the request takes + the SSE path, acks first, and ends with the stamped result on close().""" + bus = _OpenSignalBus() + handler = ListenHandler(bus) + server = Server("test", on_subscriptions_listen=handler) + body = _listen_body() + responses: list[httpx.Response] = [] async with _asgi_client(server, json_response=True) as http: async with anyio.create_task_group() as tg: diff --git a/tests/server/test_subscriptions.py b/tests/server/test_subscriptions.py index 9f987057e..1d7640d73 100644 --- a/tests/server/test_subscriptions.py +++ b/tests/server/test_subscriptions.py @@ -338,14 +338,37 @@ async def run() -> None: handler.close() +class _GatedSession(_RecordingSession): + """Lets the ack through, then wedges event sends until released - a client + that stopped reading the transport.""" + + def __init__(self) -> None: + super().__init__() + self.wedged = anyio.Event() + self.release = anyio.Event() + + async def send_notification( + self, notification: ServerNotification, related_request_id: RequestId | None = None + ) -> None: + if self.sent: # the ack is the first frame; only event sends wedge + self.wedged.set() + await self.release.wait() + await super().send_notification(notification, related_request_id) + + @pytest.mark.anyio -async def test_backlog_overflow_ends_the_stream() -> None: +async def test_backlog_overflow_ends_the_stream_and_frees_its_slot() -> None: """SDK-defined: a stream whose client stopped reading is ended at - `max_buffered_events` rather than buffering forever; the client re-listens.""" + `max_buffered_events` rather than buffering forever. The subscription slot + frees at overflow time - the stream's own cleanup may be wedged in a + transport write nothing can wake - and the backlog still drains into the + stamped graceful result once that write completes.""" bus = InMemorySubscriptionBus() - handler = ListenHandler(bus, max_buffered_events=1) - session = _RecordingSession() + handler = ListenHandler(bus, max_subscriptions=1, max_buffered_events=1) + session = _GatedSession() results: list[SubscriptionsListenResult] = [] + late_session = _RecordingSession() + late_results: list[SubscriptionsListenResult] = [] async with anyio.create_task_group() as tg: @@ -355,11 +378,49 @@ async def run() -> None: tg.start_soon(run) await session.wait_for(1) - # Two publishes before the handler task resumes: the first fills the - # one-slot buffer, the second overflows and ends the stream. - await bus.publish(ToolsListChanged()) - await bus.publish(ToolsListChanged()) + await bus.publish(ToolsListChanged()) # consumed, then wedged mid-send + with anyio.fail_after(5): + await session.wedged.wait() + await bus.publish(ToolsListChanged()) # fills the one-slot buffer + await bus.publish(ToolsListChanged()) # overflows: the stream is ended + + async def run_late() -> None: + late_results.append(await handler(_ctx(late_session, request_id=8), _params(tools_list_changed=True))) + + # The ended stream's slot is free immediately - a new listen does not + # wait for the wedged write to die with its connection. + tg.start_soon(run_late) + await late_session.wait_for(1) + + session.release.set() + handler.close() delivered = [notification for notification, _ in session.sent[1:]] - assert len(delivered) == 1 # the buffered event still drained - assert results[0].meta is not None # the stream ended with the stamped result + assert len(delivered) == 2 # the wedged event and the buffered one still drained + assert results[0].meta == {SUBSCRIPTION_ID_META_KEY: 7} + assert late_results[0].meta == {SUBSCRIPTION_ID_META_KEY: 8} + + +@pytest.mark.anyio +async def test_same_task_publish_burst_does_not_overflow_a_healthy_stream() -> None: + """SDK-defined: `publish` ends with a checkpoint, so a burst of events from + one task (no yields of its own) lets a reading stream drain between + publishes instead of deterministically overflowing the buffer.""" + bus = InMemorySubscriptionBus() + handler = ListenHandler(bus, max_buffered_events=99) + session = _RecordingSession() + + async with anyio.create_task_group() as tg: + + async def run() -> None: + await handler(_ctx(session), _params(tools_list_changed=True)) + + tg.start_soon(run) + await session.wait_for(1) + + for _ in range(100): + await bus.publish(ToolsListChanged()) + await session.wait_for(101) + handler.close() + + assert len(session.sent) == 101 # the ack plus every event in the burst From 6eddb3427796f17e35975ee02938c8d6a5265a80 Mon Sep 17 00:00:00 2001 From: Max Isbey <224885523+maxisbey@users.noreply.github.com> Date: Tue, 30 Jun 2026 21:57:11 +0000 Subject: [PATCH 7/7] Isolate a raising bus unsubscribe from listen stream cleanup The unsubscribe callable returned by a custom SubscriptionBus runs first in the stream's finally block; if it raised, the stream's slot release and buffer closes were skipped, permanently consuming a max_subscriptions slot. Run it through a logged isolation boundary, mirroring the listener isolation publish already does. --- src/mcp/server/subscriptions.py | 15 +++++++++- tests/server/test_subscriptions.py | 47 ++++++++++++++++++++++++++++++ 2 files changed, 61 insertions(+), 1 deletion(-) diff --git a/src/mcp/server/subscriptions.py b/src/mcp/server/subscriptions.py index a1c69690f..d071cfdbf 100644 --- a/src/mcp/server/subscriptions.py +++ b/src/mcp/server/subscriptions.py @@ -141,6 +141,19 @@ def unsubscribe() -> None: return unsubscribe +def _safe_unsubscribe(unsubscribe: Callable[[], None]) -> None: + """Run a bus's unsubscribe callable, isolating the stream from it raising. + + The callable comes from a custom `SubscriptionBus`; a raising one is + logged and skipped so it cannot stop the stream's own cleanup from + releasing its subscription slot. + """ + try: + unsubscribe() + except Exception: # fan-out boundary: a raising bus must not skip stream cleanup + logger.exception("bus unsubscribe raised; continuing stream cleanup") + + def _honored_subset(requested: SubscriptionFilter) -> SubscriptionFilter: """The subset of `requested` the server will deliver, for the ack. @@ -263,7 +276,7 @@ def deliver(event: ServerEvent) -> None: _event_to_notification(event, meta), related_request_id=subscription_id ) finally: - unsubscribe() + _safe_unsubscribe(unsubscribe) self._streams.discard(send) send.close() recv.close() diff --git a/tests/server/test_subscriptions.py b/tests/server/test_subscriptions.py index 1d7640d73..579e3522f 100644 --- a/tests/server/test_subscriptions.py +++ b/tests/server/test_subscriptions.py @@ -314,6 +314,53 @@ def bad(event: ServerEvent) -> None: assert seen == [ToolsListChanged()] +@pytest.mark.anyio +async def test_raising_unsubscribe_does_not_skip_stream_cleanup() -> None: + """SDK-defined: a custom bus whose unsubscribe callable raises is logged + and isolated - the stream still releases its subscription slot, closes its + buffers, and returns the graceful result.""" + + class _RaisingUnsubscribeBus(InMemorySubscriptionBus): + def subscribe(self, listener: Callable[[ServerEvent], None]) -> Callable[[], None]: + super().subscribe(listener) + + def unsubscribe() -> None: + raise RuntimeError("boom") + + return unsubscribe + + handler = ListenHandler(_RaisingUnsubscribeBus(), max_subscriptions=1) + session = _RecordingSession() + results: list[SubscriptionsListenResult] = [] + + async with anyio.create_task_group() as tg: + + async def run() -> None: + results.append(await handler(_ctx(session), _params(tools_list_changed=True))) + + tg.start_soon(run) + await session.wait_for(1) + handler.close() + + assert results[0].meta == {SUBSCRIPTION_ID_META_KEY: 7} # the graceful result still returned + + # The slot was released despite the raising unsubscribe: a second listen + # is accepted at the cap of one. + late_session = _RecordingSession() + late_results: list[SubscriptionsListenResult] = [] + + async with anyio.create_task_group() as tg: + + async def run_late() -> None: + late_results.append(await handler(_ctx(late_session, request_id=8), _params(tools_list_changed=True))) + + tg.start_soon(run_late) + await late_session.wait_for(1) + handler.close() + + assert late_results[0].meta == {SUBSCRIPTION_ID_META_KEY: 8} + + @pytest.mark.anyio async def test_subscription_limit_rejects_further_streams_pre_ack() -> None: """SDK-defined cap (mirrors the TypeScript SDK): past `max_subscriptions`,