-
Notifications
You must be signed in to change notification settings - Fork 1.9k
[SEP-2575][SEP-2567] 2026-06 stateless support: stdio + InMemory transports #2132
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: fweinberger/v2-http-stateless
Are you sure you want to change the base?
Changes from all commits
07d8595
20488dd
92f410a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,6 @@ | ||
| --- | ||
| '@modelcontextprotocol/core': minor | ||
| '@modelcontextprotocol/server': minor | ||
| '@modelcontextprotocol/client': minor | ||
| --- | ||
| 2026-06 stateless support over stdio + InMemory transports (StreamDriver, serverStatelessRouter). |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,94 @@ | ||
| import { ZodError } from 'zod/v4'; | ||
| import type { JSONRPCMessage, JSONRPCRequest, RequestId } from '../types/index.js'; | ||
| import { isJSONRPCNotification, ProtocolError, ProtocolErrorCode } from '../types/index.js'; | ||
| import { errorResponse } from './dispatcher.js'; | ||
| import type { ListenContext, StatelessHandlers } from './stateless.js'; | ||
| import { isStatelessRequest } from './stateless.js'; | ||
|
|
||
| /** | ||
| * Per-message router for pipe-shaped server transports (stdio, in-memory). | ||
| * Call once per inbound message. Returns `true` if the message was claimed by | ||
| * the stateless path (so the caller should NOT pass it to legacy `onmessage`). | ||
| * | ||
| * Stateless requests are dispatched via {@linkcode StatelessHandlers}; | ||
| * `notifications/cancelled` aborts a tracked in-flight request. | ||
| */ | ||
| export function routeServerStateless( | ||
| message: JSONRPCMessage, | ||
| handlers: StatelessHandlers, | ||
| inflight: Map<RequestId, AbortController>, | ||
| write: (m: JSONRPCMessage) => void, | ||
| ctx: ListenContext, | ||
| onerror?: (e: Error) => void | ||
| ): boolean { | ||
| if (isStatelessRequest(message)) { | ||
| const ac = new AbortController(); | ||
| inflight.set(message.id, ac); | ||
| void handleOne(handlers, message, ac, write, ctx) | ||
| .catch(error => onerror?.(error instanceof Error ? error : new Error(String(error)))) | ||
| .finally(() => inflight.delete(message.id)); | ||
| return true; | ||
| } | ||
| if (isJSONRPCNotification(message) && message.method === 'notifications/cancelled') { | ||
| const requestId = (message.params as { requestId?: RequestId } | undefined)?.requestId; | ||
| const ac = requestId === undefined ? undefined : inflight.get(requestId); | ||
| if (ac) { | ||
| ac.abort(); | ||
| return true; | ||
| } | ||
| } | ||
| return false; | ||
| } | ||
|
|
||
| async function handleOne( | ||
| handlers: StatelessHandlers, | ||
| req: JSONRPCRequest, | ||
| ac: AbortController, | ||
| write: (m: JSONRPCMessage) => void, | ||
| ctx: ListenContext | ||
| ): Promise<void> { | ||
| if (req.method === 'subscriptions/listen') { | ||
| let listenStream; | ||
| try { | ||
| listenStream = handlers.listen(req, ctx); | ||
| } catch (error) { | ||
| write(listenErrorResponse(req.id, error)); | ||
| return; | ||
| } | ||
| const { stream, close } = listenStream; | ||
| ac.signal.addEventListener('abort', close, { once: true }); | ||
| try { | ||
| for await (const m of stream) { | ||
| if (ac.signal.aborted) break; | ||
| write(m); | ||
| } | ||
| // Stream ended without a client-side abort (backend eviction or | ||
| // natural end). Write a terminal error so StreamDriver's listen | ||
| // queue closes instead of hanging indefinitely. | ||
| if (!ac.signal.aborted) { | ||
| write(errorResponse(req.id, ProtocolErrorCode.InternalError, 'Subscription stream ended')); | ||
| } | ||
| } catch (error) { | ||
| if (!ac.signal.aborted) { | ||
| write(listenErrorResponse(req.id, error)); | ||
| } | ||
| throw error; | ||
| } finally { | ||
| ac.signal.removeEventListener('abort', close); | ||
| close(); | ||
| } | ||
| } else { | ||
| const response = await handlers.dispatch(req, { signal: ac.signal, authInfo: ctx.authInfo, notify: write }); | ||
| write(response); | ||
| } | ||
| } | ||
|
Check warning on line 84 in packages/core/src/shared/serverStatelessRouter.ts
|
||
|
Comment on lines
+81
to
+84
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟡 The non-listen Extended reasoning...The gap
} else {
const response = await handlers.dispatch(req, { signal: ac.signal, authInfo: ctx.authInfo, notify: write });
write(response);
}There is no try/catch. If Why the sibling paths don't have this gap
Trigger likelihood (why this is a nit, not a blocker) For the bundled handlers ( Step-by-step reproduction
Fix Mirror the listen-branch fix: } else {
try {
const response = await handlers.dispatch(req, { signal: ac.signal, authInfo: ctx.authInfo, notify: write });
write(response);
} catch (error) {
if (!ac.signal.aborted) {
write(errorResponse(req.id, ProtocolErrorCode.InternalError, error instanceof Error ? error.message : 'Dispatch failed'));
}
throw error;
}
}This keeps the rejection propagating to |
||
|
|
||
| function listenErrorResponse(id: RequestId, error: unknown) { | ||
| if (error instanceof ProtocolError) { | ||
| return errorResponse(id, error.code, error.message); | ||
| } | ||
| if (error instanceof ZodError) { | ||
| return errorResponse(id, ProtocolErrorCode.InvalidParams, error.message); | ||
| } | ||
| return errorResponse(id, ProtocolErrorCode.InternalError, error instanceof Error ? error.message : 'Subscription failed'); | ||
| } | ||
|
Check warning on line 94 in packages/core/src/shared/serverStatelessRouter.ts
|
||
|
Comment on lines
+86
to
+94
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟡 listenErrorResponse() drops the public Extended reasoning...1.
if (error instanceof ProtocolError) {
return errorResponse(id, error.code, error.message); // ← drops error.data
}Concrete walkthrough. A custom throw new ProtocolError(
ProtocolErrorCode.MissingRequiredClientCapability,
'Client must declare resources.subscribe',
{ requiredCapabilities: { resources: { subscribe: true } } }
);
This is an asymmetry purely between transports for the same throw, which is exactly what Fix: if (error instanceof ProtocolError) {
return errorResponse(id, error.code, error.message, error.data);
}Practical impact is small (only custom backends that throw 2. The sibling HTTP listen catch was left unfixed The earlier review on this PR flagged that the listen catch hard-coded } catch (error) {
return jsonError(
400,
ProtocolErrorCode.InvalidParams,
error instanceof Error ? error.message : 'Invalid listen request',
listen.r.id
);
}Server-internal failures from a custom |
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,132 @@ | ||
| import { isJSONRPCNotification, isJSONRPCResponse } from '../types/guards.js'; | ||
| import type { JSONRPCMessage, JSONRPCRequest, RequestId } from '../types/index.js'; | ||
| import { JSONRPC_VERSION, ProtocolErrorCode } from '../types/index.js'; | ||
| import { AsyncQueue } from '../util/asyncQueue.js'; | ||
| import { META_KEYS } from './stateless.js'; | ||
|
|
||
| /** | ||
| * Minimal request→response correlator for pipe-shaped client transports | ||
| * (stdio, in-memory) under the 2026-06 stateless model. Provides | ||
| * `sendAndReceive(request) → AsyncIterable<JSONRPCMessage>` so the Client can | ||
| * make stateless calls without going through `Protocol.request()` and its | ||
| * `_responseHandlers` map. | ||
| * | ||
| * The transport feeds every inbound message to {@linkcode onMessage}; the | ||
| * driver routes responses by `id` and notifications by `_meta.subscriptionId` | ||
| * (which is the originating request's id, per SEP-2575) to the matching | ||
| * iterator. Closing/breaking the iterator sends `notifications/cancelled`. | ||
| */ | ||
| export class StreamDriver { | ||
| // Seed in a range Protocol's `_requestMessageId` (which starts at 0) will | ||
| // not reach, so a stdio fallback that mixes a discover-probe (StreamDriver) | ||
| // with a legacy initialize (Protocol.request) on the same pipe cannot | ||
| // collide on id 0. | ||
| private _nextId = 0x40_00_00_00; | ||
| private readonly _pending = new Map<RequestId, AsyncQueue<JSONRPCMessage>>(); | ||
|
|
||
| constructor(private readonly _send: (m: JSONRPCMessage) => Promise<void>) {} | ||
|
|
||
| /** | ||
| * Sends one request and returns an async-iterable of the messages the server | ||
| * emits for it: zero or more notifications, then exactly one response (which | ||
| * ends the iteration). For `subscriptions/listen`, the iteration continues | ||
| * until `break`/`return()` (which sends `notifications/cancelled`). | ||
| * | ||
| * The request is dispatched and registered in `_pending` immediately, before | ||
| * the first `next()`. Callers MUST consume the iterable (`for await` is | ||
| * sufficient: it calls `return()` on break/throw); obtaining it and never | ||
| * iterating leaks the `_pending` entry until {@linkcode close}. | ||
| */ | ||
| sendAndReceive(request: Omit<JSONRPCRequest, 'jsonrpc' | 'id'>, opts?: { signal?: AbortSignal }): AsyncIterable<JSONRPCMessage> { | ||
| const id = this._nextId++; | ||
| const isListen = request.method === 'subscriptions/listen'; | ||
| const queue = new AsyncQueue<JSONRPCMessage>(256); | ||
|
|
||
| const cleanup = () => { | ||
| this._pending.delete(id); | ||
| this._pending.delete(String(id)); | ||
| opts?.signal?.removeEventListener('abort', onAbort); | ||
| }; | ||
| const cancel = () => { | ||
| if (queue.closed) return; | ||
| this._send({ jsonrpc: JSONRPC_VERSION, method: 'notifications/cancelled', params: { requestId: id } }).catch(() => {}); | ||
| queue.close(); | ||
| cleanup(); | ||
| }; | ||
| const onAbort = () => cancel(); | ||
| opts?.signal?.addEventListener('abort', onAbort, { once: true }); | ||
|
|
||
| this._pending.set(id, queue); | ||
| // `_meta.subscriptionId` on inbound notifications equals the string form | ||
| // of the request id (SEP-2575). Register the same queue under that key | ||
| // so {@linkcode onMessage} can route notifications without a second map. | ||
| this._pending.set(String(id), queue); | ||
|
|
||
| this._send({ jsonrpc: JSONRPC_VERSION, id, ...request }).catch(error => { | ||
| // Surface send failure to the iterator instead of hanging forever. | ||
| queue.push({ | ||
| jsonrpc: JSONRPC_VERSION, | ||
| id, | ||
| error: { | ||
| code: ProtocolErrorCode.InternalError, | ||
| message: `Transport send failed: ${error instanceof Error ? error.message : String(error)}` | ||
| } | ||
| }); | ||
| queue.close(); | ||
| cleanup(); | ||
| }); | ||
|
|
||
| const inner = queue.iterate(); | ||
| return { | ||
| [Symbol.asyncIterator]: () => ({ | ||
| async next(): Promise<IteratorResult<JSONRPCMessage>> { | ||
| const r = await inner.next(); | ||
| if (r.done) { | ||
| cleanup(); | ||
| } else if (!isListen && isJSONRPCResponse(r.value)) { | ||
| // Non-listen: end after the response. | ||
| queue.close(); | ||
| cleanup(); | ||
| } | ||
| return r; | ||
| }, | ||
| async return(): Promise<IteratorResult<JSONRPCMessage>> { | ||
| cancel(); | ||
| return { value: undefined, done: true }; | ||
| } | ||
| }) | ||
| }; | ||
| } | ||
|
|
||
| /** | ||
| * Feeds one inbound message to the driver. The transport calls this for | ||
| * every message received while in stateless mode. Returns `true` if the | ||
| * message was claimed (routed to a pending iterator). | ||
| */ | ||
| onMessage(m: JSONRPCMessage): boolean { | ||
| if ('id' in m && m.id !== null && m.id !== undefined) { | ||
| const q = this._pending.get(m.id); | ||
| if (q) { | ||
| q.push(m); | ||
| return true; | ||
| } | ||
| } | ||
| if (isJSONRPCNotification(m)) { | ||
| const sid = (m.params?._meta as Record<string, unknown> | undefined)?.[META_KEYS.subscriptionId]; | ||
| if (typeof sid === 'string') { | ||
| const q = this._pending.get(sid); | ||
| if (q) { | ||
| q.push(m); | ||
| return true; | ||
| } | ||
| } | ||
| } | ||
| return false; | ||
| } | ||
|
|
||
| /** Ends every pending iterator (e.g., on transport close). */ | ||
| close(): void { | ||
| for (const q of this._pending.values()) q.close(); | ||
| this._pending.clear(); | ||
| } | ||
| } |
Uh oh!
There was an error while loading. Please reload this page.