Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/stateless-pipe-transports.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
'@modelcontextprotocol/core': minor
'@modelcontextprotocol/server': minor
'@modelcontextprotocol/client': minor
---
2026-06 stateless support over stdio + InMemory transports (StreamDriver, serverStatelessRouter).
30 changes: 28 additions & 2 deletions packages/client/src/client/stdio.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ import process from 'node:process';
import type { Stream } from 'node:stream';
import { PassThrough } from 'node:stream';

import type { JSONRPCMessage, Transport } from '@modelcontextprotocol/core';
import { ReadBuffer, SdkError, SdkErrorCode, serializeMessage } from '@modelcontextprotocol/core';
import type { JSONRPCMessage, JSONRPCRequest, Transport } from '@modelcontextprotocol/core';
import { isStatelessProtocolVersion, ReadBuffer, SdkError, SdkErrorCode, serializeMessage, StreamDriver } from '@modelcontextprotocol/core';
import spawn from 'cross-spawn';

export type StdioServerParameters = {
Expand Down Expand Up @@ -95,11 +95,26 @@ export class StdioClientTransport implements Transport {
private _readBuffer: ReadBuffer = new ReadBuffer();
private _serverParams: StdioServerParameters;
private _stderrStream: PassThrough | null = null;
private _protocolVersion?: string;
/* eslint-disable-next-line unicorn/consistent-function-scoping */
private readonly _driver = new StreamDriver(m => this.send(m));

onclose?: () => void;
onerror?: (error: Error) => void;
onmessage?: (message: JSONRPCMessage) => void;

/**
* Sends one request and returns the server's messages for it. Backed by
* `StreamDriver`; bypasses `Protocol.request()`.
*/
sendAndReceive(request: Omit<JSONRPCRequest, 'jsonrpc' | 'id'>, opts?: { signal?: AbortSignal }): AsyncIterable<JSONRPCMessage> {
return this._driver.sendAndReceive(request, opts);
}

setProtocolVersion(version: string): void {
this._protocolVersion = version;
}

constructor(server: StdioServerParameters) {
this._serverParams = server;
if (server.stderr === 'pipe' || server.stderr === 'overlapped') {
Expand Down Expand Up @@ -195,6 +210,16 @@ export class StdioClientTransport implements Transport {
break;
}

// Default to StreamDriver until setProtocolVersion is called with
// a pre-2026 version. The discover/initialize probe goes via
// sendAndReceive, so the driver claims those.
if (
(this._protocolVersion === undefined || isStatelessProtocolVersion(this._protocolVersion)) &&
this._driver.onMessage(message)
) {
continue;
}

this.onmessage?.(message);
} catch (error) {
this.onerror?.(error as Error);
Expand All @@ -203,6 +228,7 @@ export class StdioClientTransport implements Transport {
}

async close(): Promise<void> {
this._driver.close();
if (this._process) {
const processToClose = this._process;
this._process = undefined;
Expand Down
2 changes: 2 additions & 0 deletions packages/core/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ export * from './shared/authUtils.js';
export * from './shared/dispatcher.js';
export * from './shared/metadataUtils.js';
export * from './shared/protocol.js';
export * from './shared/serverStatelessRouter.js';
export * from './shared/stateless.js';
export * from './shared/stdio.js';
export * from './shared/streamDriver.js';
export * from './shared/toolNameValidation.js';
export * from './shared/transport.js';
export * from './shared/uriTemplate.js';
Expand Down
94 changes: 94 additions & 0 deletions packages/core/src/shared/serverStatelessRouter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
import { ZodError } from 'zod/v4';
import type { JSONRPCMessage, JSONRPCRequest, RequestId } from '../types/index.js';
import { isJSONRPCNotification, ProtocolError, ProtocolErrorCode } from '../types/index.js';
import { errorResponse } from './dispatcher.js';
import type { ListenContext, StatelessHandlers } from './stateless.js';
import { isStatelessRequest } from './stateless.js';

/**
* Per-message router for pipe-shaped server transports (stdio, in-memory).
* Call once per inbound message. Returns `true` if the message was claimed by
* the stateless path (so the caller should NOT pass it to legacy `onmessage`).
*
* Stateless requests are dispatched via {@linkcode StatelessHandlers};
* `notifications/cancelled` aborts a tracked in-flight request.
*/
export function routeServerStateless(
message: JSONRPCMessage,
handlers: StatelessHandlers,
inflight: Map<RequestId, AbortController>,
write: (m: JSONRPCMessage) => void,
ctx: ListenContext,
onerror?: (e: Error) => void
): boolean {
if (isStatelessRequest(message)) {
const ac = new AbortController();
inflight.set(message.id, ac);
void handleOne(handlers, message, ac, write, ctx)
.catch(error => onerror?.(error instanceof Error ? error : new Error(String(error))))
.finally(() => inflight.delete(message.id));
return true;
}
if (isJSONRPCNotification(message) && message.method === 'notifications/cancelled') {
const requestId = (message.params as { requestId?: RequestId } | undefined)?.requestId;
const ac = requestId === undefined ? undefined : inflight.get(requestId);
if (ac) {
ac.abort();
return true;
}
}
return false;
}

async function handleOne(
handlers: StatelessHandlers,
req: JSONRPCRequest,
ac: AbortController,
write: (m: JSONRPCMessage) => void,
ctx: ListenContext
): Promise<void> {
if (req.method === 'subscriptions/listen') {
let listenStream;
try {
listenStream = handlers.listen(req, ctx);
} catch (error) {
write(listenErrorResponse(req.id, error));
return;
}
const { stream, close } = listenStream;
ac.signal.addEventListener('abort', close, { once: true });
try {
for await (const m of stream) {
if (ac.signal.aborted) break;
write(m);
}
// Stream ended without a client-side abort (backend eviction or
// natural end). Write a terminal error so StreamDriver's listen
// queue closes instead of hanging indefinitely.
if (!ac.signal.aborted) {
write(errorResponse(req.id, ProtocolErrorCode.InternalError, 'Subscription stream ended'));
}
} catch (error) {
if (!ac.signal.aborted) {
write(listenErrorResponse(req.id, error));
}
throw error;
} finally {
ac.signal.removeEventListener('abort', close);
close();
}
} else {
Comment thread
claude[bot] marked this conversation as resolved.
const response = await handlers.dispatch(req, { signal: ac.signal, authInfo: ctx.authInfo, notify: write });
write(response);
}
}

Check warning on line 84 in packages/core/src/shared/serverStatelessRouter.ts

View check run for this annotation

Claude / Claude Code Review

dispatch branch in handleOne writes nothing to the pipe on rejection

The non-listen `else` branch of `handleOne()` awaits `handlers.dispatch()` with no try/catch — if it rejects, the rejection only reaches `routeServerStateless()`'s `.catch(onerror)` and nothing is written to the pipe, so the client's `StreamDriver` iterator hangs until the 60s default timeout. This is the same shape just fixed for the listen branch in 20488ddb (and defensively guarded in the SSE path via `controller.error()` in `statelessHttp.ts`); wrap the await in try/catch and write `errorRes
Comment on lines +81 to +84
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 The non-listen else branch of handleOne() awaits handlers.dispatch() with no try/catch — if it rejects, the rejection only reaches routeServerStateless()'s .catch(onerror) and nothing is written to the pipe, so the client's StreamDriver iterator hangs until the 60s default timeout. This is the same shape just fixed for the listen branch in 20488dd (and defensively guarded in the SSE path via controller.error() in statelessHttp.ts); wrap the await in try/catch and write errorResponse(req.id, ProtocolErrorCode.InternalError, ...) on rejection.

Extended reasoning...

The gap

handleOne()'s non-listen branch (serverStatelessRouter.ts:81-83) is:

} else {
    const response = await handlers.dispatch(req, { signal: ac.signal, authInfo: ctx.authInfo, notify: write });
    write(response);
}

There is no try/catch. If handlers.dispatch() rejects, the rejection propagates to routeServerStateless()'s .catch(error => onerror?.(...)) (line 28) which only logs. Nothing is written to the pipe. The client's StreamDriver registered a queue under req.id; for a non-listen request the wrapper in streamDriver.ts only ends after a JSONRPCResponse for that id arrives, or on return()/close()/onAbort. With no message ever keyed to req.id, the client's for await blocks until the request-level AbortSignal fires — DEFAULT_REQUEST_TIMEOUT_MSEC (60 s) by default.

Why the sibling paths don't have this gap

  • The listen branch in this same function was hardened in 20488dd to write a terminal errorResponse(req.id, …) whenever the for-await throws or ends without a client abort, specifically so StreamDriver's queue closes instead of hanging. The else branch is the same failure shape, unguarded.
  • The HTTP path (statelessHttp.ts): handleHttp's outer try/catch turns a rejected dispatch into a 500 JSON-RPC error response, and the SSE branch attaches .catch(error => controller.error(error)) to the dispatch promise so the response stream errors and the client's reader observes EOF. Over stdio/InMemory there is no per-request connection — the pipe stays open and the only signal the client can observe is a JSON-RPC message keyed to req.id.

Trigger likelihood (why this is a nit, not a blocker)

For the bundled handlers (Server.statelessHandlers().dispatch_dispatchStateless), a rejection is hard to reach: dispatcher.dispatch() has its own internal try/catch and always returns an errorResponse on throw, and parseClientMeta/_buildDispatchServerContext are sync, defensive type checks. The realistic triggers are (a) the post-processing in _dispatchStateless (e.g., the resultType spread on response.result) throwing if a handler returns a non-object value that bypasses TypeScript, or (b) a user-supplied StatelessHandlers installed via the public setStatelessHandlers() whose dispatch rejects. Neither is common — but StatelessHandlers is an exported, public extension point, and the failure mode when it does happen (60 s of silence followed by an opaque timeout error) is much harder to debug than an immediate InternalError response.

Step-by-step reproduction

  1. A user installs a custom StatelessHandlers on a StdioServerTransport via transport.setStatelessHandlers({ dispatch: async () => { throw new Error('db down') }, listen: ... }).
  2. A 2026-06 client calls client.callTool({ name: 'echo' }) over stdio. StreamDriver.sendAndReceive() registers a queue under id 0x40000000 and writes the request.
  3. The server's processReadBuffer calls routeServerStatelesshandleOnehandlers.dispatch(req, …), which rejects.
  4. The rejection propagates out of handleOne, is caught by .catch(error => onerror?.(...)), and is logged. inflight is cleaned up by .finally(). No write to the pipe.
  5. The client's StreamDriver queue for id 0x40000000 never receives a message. callTool() blocks on inner.next() for 60 seconds, then the request AbortSignal fires and the iterator is cancelled, surfacing a timeout error to the application — not the actual 'db down' failure.

Fix

Mirror the listen-branch fix:

} else {
    try {
        const response = await handlers.dispatch(req, { signal: ac.signal, authInfo: ctx.authInfo, notify: write });
        write(response);
    } catch (error) {
        if (!ac.signal.aborted) {
            write(errorResponse(req.id, ProtocolErrorCode.InternalError, error instanceof Error ? error.message : 'Dispatch failed'));
        }
        throw error;
    }
}

This keeps the rejection propagating to onerror for logging while ensuring the client always observes a terminal JSON-RPC message keyed to req.id, matching the SSE path's controller.error() and the listen branch's terminal write.


function listenErrorResponse(id: RequestId, error: unknown) {
if (error instanceof ProtocolError) {
return errorResponse(id, error.code, error.message);
}
if (error instanceof ZodError) {
return errorResponse(id, ProtocolErrorCode.InvalidParams, error.message);
}
return errorResponse(id, ProtocolErrorCode.InternalError, error instanceof Error ? error.message : 'Subscription failed');
}

Check warning on line 94 in packages/core/src/shared/serverStatelessRouter.ts

View check run for this annotation

Claude / Claude Code Review

listenErrorResponse drops ProtocolError.data; sibling HTTP listen catch not updated

listenErrorResponse() drops the public `error.data` field when handling a `ProtocolError`, so structured data (e.g. `requiredCapabilities` on a `MissingRequiredClientCapability` error from a custom `SubscriptionBackend`) is lost over pipe transports while the dispatcher's own catch (dispatcher.ts:184-189) preserves it. Fix is one line: pass `error.data` as the 4th `errorResponse()` arg. Also note: the sibling HTTP listen catch in `statelessHttp.ts:153-160` (flagged in the prior review and replie
Comment on lines +86 to +94
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 listenErrorResponse() drops the public error.data field when handling a ProtocolError, so structured data (e.g. requiredCapabilities on a MissingRequiredClientCapability error from a custom SubscriptionBackend) is lost over pipe transports while the dispatcher's own catch (dispatcher.ts:184-189) preserves it. Fix is one line: pass error.data as the 4th errorResponse() arg. Also note: the sibling HTTP listen catch in statelessHttp.ts:153-160 (flagged in the prior review and replied to with "Fixed in 20488dd") was not actually updated — it still maps every throw to InvalidParams/400 and should mirror this discrimination.

Extended reasoning...

1. listenErrorResponse() drops ProtocolError.data

ProtocolError carries a public readonly data?: unknown field (packages/core/src/types/errors.ts:12), and errorResponse() (packages/core/src/shared/dispatcher.ts:200) accepts a 4th data?: unknown parameter that it spreads into the wire error. The dispatcher's own catch block (dispatcher.ts:184-189) propagates e.data into the JSON-RPC error. But the new listenErrorResponse() here calls:

if (error instanceof ProtocolError) {
    return errorResponse(id, error.code, error.message);   // ← drops error.data
}

Concrete walkthrough. A custom SubscriptionBackend.handle() (the docs explicitly say to supply a distributed implementation for horizontal scale) throws:

throw new ProtocolError(
    ProtocolErrorCode.MissingRequiredClientCapability,
    'Client must declare resources.subscribe',
    { requiredCapabilities: { resources: { subscribe: true } } }
);
  • Over HTTP (the dispatch path), the dispatcher catch forwards e.data and the client sees error.data.requiredCapabilities.
  • Over stdio/InMemory (this PR), handleOne()listenErrorResponse() builds errorResponse(id, code, message) and the client receives error.data === undefined. The structured data is silently gone, and a client that branches on requiredCapabilities to fix up its capabilities and retry has nothing to go on.

This is an asymmetry purely between transports for the same throw, which is exactly what serverStatelessRouter is supposed to avoid.

Fix:

if (error instanceof ProtocolError) {
    return errorResponse(id, error.code, error.message, error.data);
}

Practical impact is small (only custom backends that throw ProtocolError with structured data from handle() are affected; the bundled InMemorySubscriptions doesn't), so this alone is a nit.

2. The sibling HTTP listen catch was left unfixed

The earlier review on this PR flagged that the listen catch hard-coded InvalidParams for every throw and noted: "The same shape exists in statelessHttp.ts:153-159 from #2131, so both sites are worth fixing together." The reply was "Fixed in 20488dd. The listen catch now discriminates ZodError/ProtocolError from other throws…" — but 20488ddb only touched serverStatelessRouter.ts. packages/server/src/server/statelessHttp.ts:153-160 (current HEAD) still reads:

} catch (error) {
    return jsonError(
        400,
        ProtocolErrorCode.InvalidParams,
        error instanceof Error ? error.message : 'Invalid listen request',
        listen.r.id
    );
}

Server-internal failures from a custom SubscriptionBackend.handle() — broker connection loss, DB lookup failure — still surface to HTTP clients as -32602 InvalidParams / HTTP 400, signalling a client fault that isn't one. statelessHttp.ts is not in this PR's diff so on a strict reading the remaining bug is pre-existing, but since the PR establishes the discrimination pattern (listenErrorResponse) and the author indicated both sites were addressed, it's worth noting the migration is partial. The HTTP path should mirror listenErrorResponse(): ZodError/ProtocolError(InvalidParams) → 400, everything else → 500 InternalError.

132 changes: 132 additions & 0 deletions packages/core/src/shared/streamDriver.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
import { isJSONRPCNotification, isJSONRPCResponse } from '../types/guards.js';
import type { JSONRPCMessage, JSONRPCRequest, RequestId } from '../types/index.js';
import { JSONRPC_VERSION, ProtocolErrorCode } from '../types/index.js';
import { AsyncQueue } from '../util/asyncQueue.js';
import { META_KEYS } from './stateless.js';

/**
* Minimal request→response correlator for pipe-shaped client transports
* (stdio, in-memory) under the 2026-06 stateless model. Provides
* `sendAndReceive(request) → AsyncIterable<JSONRPCMessage>` so the Client can
* make stateless calls without going through `Protocol.request()` and its
* `_responseHandlers` map.
*
* The transport feeds every inbound message to {@linkcode onMessage}; the
* driver routes responses by `id` and notifications by `_meta.subscriptionId`
* (which is the originating request's id, per SEP-2575) to the matching
* iterator. Closing/breaking the iterator sends `notifications/cancelled`.
*/
export class StreamDriver {
// Seed in a range Protocol's `_requestMessageId` (which starts at 0) will
// not reach, so a stdio fallback that mixes a discover-probe (StreamDriver)
// with a legacy initialize (Protocol.request) on the same pipe cannot
// collide on id 0.
private _nextId = 0x40_00_00_00;
private readonly _pending = new Map<RequestId, AsyncQueue<JSONRPCMessage>>();

constructor(private readonly _send: (m: JSONRPCMessage) => Promise<void>) {}

/**
* Sends one request and returns an async-iterable of the messages the server
* emits for it: zero or more notifications, then exactly one response (which
* ends the iteration). For `subscriptions/listen`, the iteration continues
* until `break`/`return()` (which sends `notifications/cancelled`).
*
* The request is dispatched and registered in `_pending` immediately, before
* the first `next()`. Callers MUST consume the iterable (`for await` is
* sufficient: it calls `return()` on break/throw); obtaining it and never
* iterating leaks the `_pending` entry until {@linkcode close}.
*/
sendAndReceive(request: Omit<JSONRPCRequest, 'jsonrpc' | 'id'>, opts?: { signal?: AbortSignal }): AsyncIterable<JSONRPCMessage> {
const id = this._nextId++;
const isListen = request.method === 'subscriptions/listen';
const queue = new AsyncQueue<JSONRPCMessage>(256);

const cleanup = () => {
this._pending.delete(id);
this._pending.delete(String(id));
opts?.signal?.removeEventListener('abort', onAbort);
};
const cancel = () => {
if (queue.closed) return;
this._send({ jsonrpc: JSONRPC_VERSION, method: 'notifications/cancelled', params: { requestId: id } }).catch(() => {});
queue.close();
cleanup();
};
const onAbort = () => cancel();
opts?.signal?.addEventListener('abort', onAbort, { once: true });

this._pending.set(id, queue);
// `_meta.subscriptionId` on inbound notifications equals the string form
// of the request id (SEP-2575). Register the same queue under that key
// so {@linkcode onMessage} can route notifications without a second map.
this._pending.set(String(id), queue);

this._send({ jsonrpc: JSONRPC_VERSION, id, ...request }).catch(error => {
// Surface send failure to the iterator instead of hanging forever.
queue.push({
jsonrpc: JSONRPC_VERSION,
id,
error: {
code: ProtocolErrorCode.InternalError,
message: `Transport send failed: ${error instanceof Error ? error.message : String(error)}`
}
});
queue.close();
cleanup();
});

const inner = queue.iterate();
return {
[Symbol.asyncIterator]: () => ({
async next(): Promise<IteratorResult<JSONRPCMessage>> {
const r = await inner.next();
if (r.done) {
cleanup();
} else if (!isListen && isJSONRPCResponse(r.value)) {
// Non-listen: end after the response.
queue.close();
cleanup();
}
return r;
},
async return(): Promise<IteratorResult<JSONRPCMessage>> {
cancel();
return { value: undefined, done: true };
}
})
};
}

/**
* Feeds one inbound message to the driver. The transport calls this for
* every message received while in stateless mode. Returns `true` if the
* message was claimed (routed to a pending iterator).
*/
onMessage(m: JSONRPCMessage): boolean {
if ('id' in m && m.id !== null && m.id !== undefined) {
const q = this._pending.get(m.id);
if (q) {
q.push(m);
return true;
}
}
if (isJSONRPCNotification(m)) {
const sid = (m.params?._meta as Record<string, unknown> | undefined)?.[META_KEYS.subscriptionId];
if (typeof sid === 'string') {
const q = this._pending.get(sid);
if (q) {
q.push(m);
return true;
}
}
}
return false;
}

/** Ends every pending iterator (e.g., on transport close). */
close(): void {
for (const q of this._pending.values()) q.close();
this._pending.clear();
}
}
Loading
Loading