Skip to content

Commit f3507eb

Browse files
committed
Add the client subscriptions/listen surface
Client.listen(filter) opens a subscription as an async context manager: it registers demux state before sending, resolves only on the server's acknowledgment (raising on error responses, transport loss, or timeout), surfaces the honored and narrowed filter subsets, and yields typed change notifications via async iteration. Graceful server closure and local close end iteration cleanly; an abrupt transport drop raises SubscriptionLost so a dead stream cannot be missed silently. strict=True turns acknowledgment narrowing into ListenNarrowedError. Dispatchers accept a caller-supplied request id (CallOptions request_id) so the subscription id is known before the request is sent; client- minted string ids stay collision-free against the numeric counter by type. ClientSession routes the acknowledgment and tagged stream notifications to the live subscription by the verbatim-typed subscription id and tees matched notifications onward, so message handlers and response-cache eviction keep working unchanged. On 2026 streamable HTTP, closing a subscription aborts the listen request's response stream: the transport translates the outbound cancellation instead of posting it, since stream closure is the protocol's cancellation signal. The listen request is exempt from read timeouts; only the acknowledgment wait is bounded. subscribe_resource and unsubscribe_resource are deprecated with pointers to Client.listen, and the migration guide documents the replacement.
1 parent 24297a1 commit f3507eb

21 files changed

Lines changed: 1764 additions & 82 deletions

docs/migration.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1517,6 +1517,10 @@ The 2026-07-28 spec restricts `notifications/progress` to the server-to-client d
15171517

15181518
On the server side, prefer the new dispatcher-agnostic `ServerSession.report_progress(progress, total, message)` (and `Context.report_progress()` on `MCPServer`) over the raw `ServerSession.send_progress_notification(progress_token, …)`. `report_progress` encapsulates the "no-op when the caller did not request progress" rule and works on every dispatcher; the raw token-taking form remains for handlers that read `_meta.progressToken` directly.
15191519

1520+
### Client resource-subscription methods deprecated (2026-07-28)
1521+
1522+
The 2026-07-28 wire removes `resources/subscribe` and `resources/unsubscribe`: resource subscriptions are delivered over the `subscriptions/listen` stream instead (see [`MCPServer` serves `subscriptions/listen`](#mcpserver-serves-subscriptionslisten-json-response-mode-requires-subscriptionsfalse-sep-2575)). `Client.subscribe_resource()` / `Client.unsubscribe_resource()` and the `ClientSession` pair now carry `typing_extensions.deprecated` and emit `mcp.MCPDeprecationWarning` at runtime. They keep working against servers negotiating 2025-11-25 or earlier; on a 2026-07-28 connection the server answers them with `-32601` (Method not found). New code should open a subscription stream with `client.listen(...)`.
1523+
15201524
## Bug Fixes
15211525

15221526
### OAuth metadata URLs no longer gain a trailing slash

src/mcp/client/client.py

Lines changed: 136 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,18 @@
55
import hashlib
66
import logging
77
import uuid
8-
from collections.abc import Awaitable, Callable, Mapping
9-
from contextlib import AsyncExitStack
8+
from collections.abc import AsyncIterator, Awaitable, Callable, Iterator, Mapping
9+
from contextlib import AsyncExitStack, asynccontextmanager
1010
from dataclasses import KW_ONLY, dataclass, field
11+
from itertools import count
1112
from typing import Any, Literal, TypeVar, cast
1213

1314
import anyio
1415
import anyio.lowlevel
1516
import mcp_types as types
1617
from mcp_types import (
1718
INVALID_PARAMS,
19+
SUBSCRIPTION_ID_META_KEY,
1820
CacheableResult,
1921
CallToolResult,
2022
CompleteResult,
@@ -37,6 +39,7 @@
3739
RequestParamsMeta,
3840
ResourceTemplateReference,
3941
ServerCapabilities,
42+
SubscriptionFilter,
4043
)
4144
from mcp_types.version import HANDSHAKE_PROTOCOL_VERSIONS, MODERN_PROTOCOL_VERSIONS
4245
from typing_extensions import deprecated
@@ -56,11 +59,12 @@
5659
SamplingFnT,
5760
)
5861
from mcp.client.streamable_http import streamable_http_client
62+
from mcp.client.subscriptions import ListenNarrowedError, ListenNotSupportedError, Subscription
5963
from mcp.server import Server
6064
from mcp.server.mcpserver import MCPServer
6165
from mcp.server.runner import modern_on_request
6266
from mcp.shared.direct_dispatcher import create_direct_dispatcher_pair
63-
from mcp.shared.dispatcher import Dispatcher, ProgressFnT
67+
from mcp.shared.dispatcher import CallOptions, Dispatcher, ProgressFnT
6468
from mcp.shared.exceptions import MCPDeprecationWarning, MCPError
6569
from mcp.shared.jsonrpc_dispatcher import JSONRPCDispatcher
6670
from mcp.shared.session import RequestResponder
@@ -282,6 +286,7 @@ async def main():
282286
`target_id` when the server is not a URL (no identity can be derived)."""
283287

284288
_entered: bool = field(init=False, default=False)
289+
_listen_ids: Iterator[int] = field(init=False, default_factory=lambda: count(1), repr=False, compare=False)
285290
_session: ClientSession | None = field(init=False, default=None)
286291
_exit_stack: AsyncExitStack | None = field(init=False, default=None)
287292
_connect: _Connector = field(init=False, repr=False, compare=False)
@@ -583,13 +588,139 @@ async def retry(r: InputResponses | None, s: str | None) -> ReadResourceResult |
583588
# Driver rounds carry inputResponses, so a terminal result reached through them is never cached (spec MUST).
584589
return await self._drive_input_required(first, retry)
585590

591+
@deprecated(
592+
"resources/subscribe is removed as of 2026-07-28; use Client.listen() instead.",
593+
category=MCPDeprecationWarning,
594+
)
586595
async def subscribe_resource(self, uri: str, *, meta: RequestParamsMeta | None = None) -> EmptyResult:
587596
"""Subscribe to resource updates."""
588-
return await self.session.subscribe_resource(uri, meta=meta)
597+
return await self.session.subscribe_resource(uri, meta=meta) # pyright: ignore[reportDeprecated]
589598

599+
@deprecated(
600+
"resources/unsubscribe is removed as of 2026-07-28; use Client.listen() instead.",
601+
category=MCPDeprecationWarning,
602+
)
590603
async def unsubscribe_resource(self, uri: str, *, meta: RequestParamsMeta | None = None) -> EmptyResult:
591604
"""Unsubscribe from resource updates."""
592-
return await self.session.unsubscribe_resource(uri, meta=meta)
605+
return await self.session.unsubscribe_resource(uri, meta=meta) # pyright: ignore[reportDeprecated]
606+
607+
@asynccontextmanager
608+
async def listen(
609+
self,
610+
notifications: SubscriptionFilter,
611+
*,
612+
strict: bool = False,
613+
ack_timeout: float = 30.0,
614+
) -> AsyncIterator[Subscription]:
615+
"""Open a `subscriptions/listen` stream (2026-07-28) and yield its `Subscription`.
616+
617+
Entering the context sends the listen request and returns once the
618+
server's acknowledgement arrives; iterate the yielded `Subscription`
619+
for the stream's typed change notifications. Exiting the context
620+
closes the stream. The listen request itself is exempt from
621+
`read_timeout_seconds` — only the acknowledgement phase is bounded
622+
(`ack_timeout`). A server result before any acknowledgement is
623+
tolerated as a degenerate immediately-graceful open: nothing honored,
624+
iteration ends at once.
625+
626+
Overlapping streams are independent handles: a notification matching
627+
several open filters is delivered to each of them.
628+
629+
Args:
630+
notifications: The notification types to opt in to.
631+
strict: Raise `ListenNarrowedError` (closing the stream) when the
632+
server honors less than requested, instead of returning the
633+
narrowed subscription.
634+
ack_timeout: Seconds to wait for the server's acknowledgement.
635+
636+
Raises:
637+
ListenNotSupportedError: The negotiated protocol version predates
638+
`subscriptions/listen`.
639+
ListenNarrowedError: `strict=True` and the server narrowed the filter.
640+
MCPError: The server rejected the request, or the connection
641+
closed before the acknowledgement.
642+
TimeoutError: No acknowledgement within `ack_timeout`.
643+
"""
644+
session = self.session
645+
if session.protocol_version not in MODERN_PROTOCOL_VERSIONS:
646+
raise ListenNotSupportedError(session.protocol_version)
647+
subscription_id = f"listen-{next(self._listen_ids)}"
648+
request = types.SubscriptionsListenRequest(
649+
params=types.SubscriptionsListenRequestParams(notifications=notifications)
650+
)
651+
data = request.model_dump(by_alias=True, mode="json", exclude_none=True)
652+
# The session's stamp (modern `_meta` envelope + transport headers) but NOT
653+
# its read-timeout fallback: a listen stream must outlive any session
654+
# default, so the dispatcher is driven directly, with no timeout at all.
655+
opts: CallOptions = {"request_id": subscription_id}
656+
session._stamp(data, opts) # pyright: ignore[reportPrivateUsage]
657+
# The modern stamp suppresses the courtesy cancel, but `close()` relies on
658+
# it: at a modern version the frame never crosses the wire - the transport
659+
# translates it into aborting the listen POST's response stream (LR-15).
660+
opts["cancel_on_abandon"] = True
661+
open_error: MCPError | None = None
662+
driver_scope = anyio.CancelScope()
663+
# Registered BEFORE the send, so the acknowledgement cannot race the route.
664+
route = session._register_listen_route(subscription_id) # pyright: ignore[reportPrivateUsage]
665+
666+
async def drive() -> None:
667+
nonlocal open_error
668+
with driver_scope:
669+
try:
670+
raw = await session._dispatcher.send_raw_request( # pyright: ignore[reportPrivateUsage]
671+
data["method"], data.get("params"), opts
672+
)
673+
except MCPError as exc:
674+
# Error response or connection close: pre-ack this wakes
675+
# `__aenter__` immediately (the settle sets the ack event);
676+
# post-ack, iteration raises `SubscriptionLost`.
677+
open_error = exc
678+
route.settle("remote")
679+
return
680+
# Receipt of the result IS the graceful close (validation is
681+
# tolerant: a foreign shape is logged, never an error).
682+
raw_meta = raw.get("_meta")
683+
meta = cast("dict[str, Any]", raw_meta) if isinstance(raw_meta, dict) else {}
684+
if meta.get(SUBSCRIPTION_ID_META_KEY) != subscription_id:
685+
logger.debug("subscriptions/listen result for %r missing its subscription-id meta", subscription_id)
686+
# A result before any ack is the degenerate immediately-graceful
687+
# open: nothing honored. After a real ack this is a no-op.
688+
route.set_acked(SubscriptionFilter())
689+
route.settle("graceful")
690+
691+
# The driver rides the session's own task group: a local task group here
692+
# would wrap every exception crossing the `yield` — the typed open
693+
# failures below AND the caller's own errors — in an ExceptionGroup.
694+
task_group = session._task_group # pyright: ignore[reportPrivateUsage]
695+
assert task_group is not None # an entered session (the `.session` gate) always has one
696+
try:
697+
task_group.start_soon(drive)
698+
try:
699+
with anyio.fail_after(ack_timeout):
700+
await route.acked.wait()
701+
except TimeoutError:
702+
raise TimeoutError(f"server did not acknowledge subscriptions/listen within {ack_timeout}s") from None
703+
if open_error is not None and route.honored is None:
704+
raise open_error
705+
subscription = Subscription(
706+
subscription_id=subscription_id,
707+
requested=notifications,
708+
# None only when the route settled remotely before any ack:
709+
# the handle then surfaces it as `SubscriptionLost` on iteration.
710+
honored=route.honored if route.honored is not None else SubscriptionFilter(),
711+
route=route,
712+
cancel_driver=driver_scope.cancel,
713+
)
714+
if strict and subscription.narrowed != SubscriptionFilter():
715+
raise ListenNarrowedError(subscription.narrowed, subscription.honored)
716+
yield subscription
717+
finally:
718+
# One teardown for every path (failed open, close(), plain exit):
719+
# the same settle-then-cancel `Subscription.close()` performs, all
720+
# idempotent against whatever reason already won.
721+
route.settle("local")
722+
driver_scope.cancel()
723+
session._unregister_listen_route(subscription_id) # pyright: ignore[reportPrivateUsage]
593724

594725
async def call_tool(
595726
self,

0 commit comments

Comments
 (0)