Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 97 additions & 8 deletions packages/middleware/node/test/streamableHttp.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1711,18 +1711,23 @@ describe('Zod v4', () => {
): Promise<StreamId> {
const event = storedEvents.get(lastEventId);
const streamId = event?.streamId || lastEventId.split('::')[0]!;
const eventsToReplay: Array<[string, { message: JSONRPCMessage }]> = [];
let foundLast = false;

for (const [eventId, data] of storedEvents.entries()) {
if (data.streamId === streamId && eventId > lastEventId) {
eventsToReplay.push([eventId, data]);
if (eventId === lastEventId) {
foundLast = true;
continue;
}
}
eventsToReplay.sort(([a], [b]) => a.localeCompare(b));
for (const [eventId, { message }] of eventsToReplay) {
if (Object.keys(message).length > 0) {
await send(eventId, message);

if (!foundLast || data.streamId !== streamId) {
continue;
}

if (Object.keys(data.message).length > 0) {
await send(eventId, data.message);
}
}

return streamId;
}
};
Expand Down Expand Up @@ -1954,6 +1959,90 @@ describe('Zod v4', () => {
toolResolve!();
});

it('should replay the terminal POST SSE response after ctx.http?.closeSSE closes the request stream', async () => {
const result = await createTestServer({
sessionIdGenerator: () => randomUUID(),
eventStore: createEventStore(),
retryInterval: 1000
});
server = result.server;
transport = result.transport;
baseUrl = result.baseUrl;
mcpServer = result.mcpServer;

mcpServer.registerTool('close-and-complete', { description: 'Closes request stream and completes later' }, async ctx => {
ctx.http?.closeSSE?.();
return {
content: [{ type: 'text', text: 'Done after reconnect' }]
};
});

const initResponse = await sendPostRequest(baseUrl, TEST_MESSAGES.initialize);
sessionId = initResponse.headers.get('mcp-session-id') as string;
expect(sessionId).toBeDefined();

const toolCallRequest: JSONRPCMessage = {
jsonrpc: '2.0',
id: 101,
method: 'tools/call',
params: { name: 'close-and-complete', arguments: {} }
};

const postResponse = await fetch(baseUrl, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
Accept: 'text/event-stream, application/json',
'mcp-session-id': sessionId,
'mcp-protocol-version': '2025-11-25'
},
body: JSON.stringify(toolCallRequest)
});

expect(postResponse.status).toBe(200);

const reader = postResponse.body?.getReader();
const { value } = await reader!.read();
const text = new TextDecoder().decode(value);
const idMatch = text.match(/id: ([^\n]+)/);
expect(idMatch).toBeTruthy();
const lastEventId = idMatch![1]!;

const closedRead = reader!.read();
const closedTimeout = new Promise<{ done: boolean; value: undefined }>((_, reject) =>
setTimeout(() => reject(new Error('POST SSE stream did not close in time')), 1000)
);
const { done } = await Promise.race([closedRead, closedTimeout]);
expect(done).toBe(true);

const reconnectResponse = await fetch(baseUrl, {
method: 'GET',
headers: {
Accept: 'text/event-stream',
'mcp-session-id': sessionId,
'mcp-protocol-version': '2025-11-25',
'last-event-id': lastEventId
}
});
expect(reconnectResponse.status).toBe(200);

const reconnectReader = reconnectResponse.body?.getReader();
let replayedText = '';
const replayTimeout = setTimeout(() => reconnectReader!.cancel(), 5000);
try {
while (!replayedText.includes('Done after reconnect')) {
const { value, done } = await reconnectReader!.read();
if (done) break;
replayedText += new TextDecoder().decode(value);
}
} finally {
clearTimeout(replayTimeout);
}

expect(replayedText).toContain('Done after reconnect');
expect(replayedText).toContain('"id":101');
});

it('should provide closeSSEStream callback in ctx when eventStore is configured', async () => {
const result = await createTestServer({
sessionIdGenerator: () => randomUUID(),
Expand Down
27 changes: 15 additions & 12 deletions packages/server/src/server/streamableHttp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -985,14 +985,15 @@ export class WebStandardStreamableHTTPServerTransport implements Transport {

const stream = this._streamMapping.get(streamId);

if (!this._enableJsonResponse && stream?.controller && stream?.encoder) {
// For SSE responses, generate event ID if event store is provided
let eventId: string | undefined;
let eventId: string | undefined;
if (!this._enableJsonResponse && this._eventStore) {
// Persist request-scoped SSE events even while the controller is
// temporarily disconnected so they can be replayed on reconnect.
eventId = await this._eventStore.storeEvent(streamId, message);
}

if (this._eventStore) {
eventId = await this._eventStore.storeEvent(streamId, message);
}
// Write the event to the response stream
if (!this._enableJsonResponse && stream?.controller && stream?.encoder) {
// Write the event to the active response stream when available.
this.writeSSEEvent(stream.controller, stream.encoder, message, eventId);
}

Expand All @@ -1004,10 +1005,10 @@ export class WebStandardStreamableHTTPServerTransport implements Transport {
const allResponsesReady = relatedIds.every(id => this._requestResponseMap.has(id));

if (allResponsesReady) {
if (!stream) {
throw new Error(`No connection established for request ID: ${String(requestId)}`);
}
if (this._enableJsonResponse && stream.resolveJson) {
if (this._enableJsonResponse) {
if (!stream?.resolveJson) {
throw new Error(`No connection established for request ID: ${String(requestId)}`);
}
// All responses ready, send as JSON
const headers: Record<string, string> = {
'Content-Type': 'application/json'
Expand All @@ -1023,9 +1024,11 @@ export class WebStandardStreamableHTTPServerTransport implements Transport {
} else {
stream.resolveJson(Response.json(responses, { status: 200, headers }));
}
} else {
} else if (stream) {
// End the SSE stream
stream.cleanup();
} else if (!this._eventStore) {
throw new Error(`No connection established for request ID: ${String(requestId)}`);
}
// Clean up
for (const id of relatedIds) {
Expand Down
68 changes: 68 additions & 0 deletions packages/server/test/server/streamableHttp.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -705,6 +705,74 @@ describe('Zod v4', () => {
// Should have id: field in the SSE event
expect(text).toContain('id:');
});

it('should replay a terminal POST SSE response after closeSSEStream disconnects the request stream', async () => {
sessionId = await initializeServer();

mcpServer.registerTool('close-and-complete', { description: 'Disconnects request SSE and completes later' }, async ctx => {
ctx.http?.closeSSE?.();
return {
content: [{ type: 'text', text: 'Done after reconnect' }]
};
});

const request = createRequest(
'POST',
{
jsonrpc: '2.0',
id: 'tool-close-1',
method: 'tools/call',
params: { name: 'close-and-complete', arguments: {} }
} as JSONRPCMessage,
{ sessionId }
);
const response = await transport.handleRequest(request);

expect(response.status).toBe(200);

const reader = response.body?.getReader();
const { value } = await reader!.read();
const text = new TextDecoder().decode(value);
const idMatch = text.match(/id: ([^\n]+)/);
expect(idMatch).toBeTruthy();
const lastEventId = idMatch![1]!;

const closedRead = reader!.read();
const closedTimeout = new Promise<{ done: boolean; value: undefined }>((_, reject) =>
setTimeout(() => reject(new Error('POST SSE stream did not close in time')), 1000)
);
const { done } = await Promise.race([closedRead, closedTimeout]);
expect(done).toBe(true);

const reconnectResponse = await transport.handleRequest(
createRequest('GET', undefined, {
sessionId,
extraHeaders: {
'last-event-id': lastEventId
}
})
);
expect(reconnectResponse.status).toBe(200);

const reconnectReader = reconnectResponse.body?.getReader();
let replayedText = '';
const replayTimeout = setTimeout(() => reconnectReader!.cancel(), 2000);
try {
while (!replayedText.includes('Done after reconnect')) {
const { value, done } = await reconnectReader!.read();
if (done) break;
replayedText += new TextDecoder().decode(value);
}
} finally {
clearTimeout(replayTimeout);
}

expect(replayedText).toContain('Done after reconnect');
expect(parseSSEData(replayedText)).toMatchObject({
jsonrpc: '2.0',
id: 'tool-close-1'
});
});
});

describe('HTTPServerTransport - Protocol Version Validation', () => {
Expand Down
Loading