diff --git a/packages/middleware/node/test/streamableHttp.test.ts b/packages/middleware/node/test/streamableHttp.test.ts index c427aa2eea..f6fcb35dac 100644 --- a/packages/middleware/node/test/streamableHttp.test.ts +++ b/packages/middleware/node/test/streamableHttp.test.ts @@ -1711,18 +1711,23 @@ describe('Zod v4', () => { ): Promise { const event = storedEvents.get(lastEventId); const streamId = event?.streamId || lastEventId.split('::')[0]!; - const eventsToReplay: Array<[string, { message: JSONRPCMessage }]> = []; + let foundLast = false; + for (const [eventId, data] of storedEvents.entries()) { - if (data.streamId === streamId && eventId > lastEventId) { - eventsToReplay.push([eventId, data]); + if (eventId === lastEventId) { + foundLast = true; + continue; } - } - eventsToReplay.sort(([a], [b]) => a.localeCompare(b)); - for (const [eventId, { message }] of eventsToReplay) { - if (Object.keys(message).length > 0) { - await send(eventId, message); + + if (!foundLast || data.streamId !== streamId) { + continue; + } + + if (Object.keys(data.message).length > 0) { + await send(eventId, data.message); } } + return streamId; } }; @@ -1954,6 +1959,90 @@ describe('Zod v4', () => { toolResolve!(); }); + it('should replay the terminal POST SSE response after ctx.http?.closeSSE closes the request stream', async () => { + const result = await createTestServer({ + sessionIdGenerator: () => randomUUID(), + eventStore: createEventStore(), + retryInterval: 1000 + }); + server = result.server; + transport = result.transport; + baseUrl = result.baseUrl; + mcpServer = result.mcpServer; + + mcpServer.registerTool('close-and-complete', { description: 'Closes request stream and completes later' }, async ctx => { + ctx.http?.closeSSE?.(); + return { + content: [{ type: 'text', text: 'Done after reconnect' }] + }; + }); + + const initResponse = await sendPostRequest(baseUrl, TEST_MESSAGES.initialize); + sessionId = initResponse.headers.get('mcp-session-id') as string; + expect(sessionId).toBeDefined(); + + const toolCallRequest: JSONRPCMessage = { + jsonrpc: '2.0', + id: 101, + method: 'tools/call', + params: { name: 'close-and-complete', arguments: {} } + }; + + const postResponse = await fetch(baseUrl, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + Accept: 'text/event-stream, application/json', + 'mcp-session-id': sessionId, + 'mcp-protocol-version': '2025-11-25' + }, + body: JSON.stringify(toolCallRequest) + }); + + expect(postResponse.status).toBe(200); + + const reader = postResponse.body?.getReader(); + const { value } = await reader!.read(); + const text = new TextDecoder().decode(value); + const idMatch = text.match(/id: ([^\n]+)/); + expect(idMatch).toBeTruthy(); + const lastEventId = idMatch![1]!; + + const closedRead = reader!.read(); + const closedTimeout = new Promise<{ done: boolean; value: undefined }>((_, reject) => + setTimeout(() => reject(new Error('POST SSE stream did not close in time')), 1000) + ); + const { done } = await Promise.race([closedRead, closedTimeout]); + expect(done).toBe(true); + + const reconnectResponse = await fetch(baseUrl, { + method: 'GET', + headers: { + Accept: 'text/event-stream', + 'mcp-session-id': sessionId, + 'mcp-protocol-version': '2025-11-25', + 'last-event-id': lastEventId + } + }); + expect(reconnectResponse.status).toBe(200); + + const reconnectReader = reconnectResponse.body?.getReader(); + let replayedText = ''; + const replayTimeout = setTimeout(() => reconnectReader!.cancel(), 5000); + try { + while (!replayedText.includes('Done after reconnect')) { + const { value, done } = await reconnectReader!.read(); + if (done) break; + replayedText += new TextDecoder().decode(value); + } + } finally { + clearTimeout(replayTimeout); + } + + expect(replayedText).toContain('Done after reconnect'); + expect(replayedText).toContain('"id":101'); + }); + it('should provide closeSSEStream callback in ctx when eventStore is configured', async () => { const result = await createTestServer({ sessionIdGenerator: () => randomUUID(), diff --git a/packages/server/src/server/streamableHttp.ts b/packages/server/src/server/streamableHttp.ts index fd3563a077..3fee90d987 100644 --- a/packages/server/src/server/streamableHttp.ts +++ b/packages/server/src/server/streamableHttp.ts @@ -985,14 +985,15 @@ export class WebStandardStreamableHTTPServerTransport implements Transport { const stream = this._streamMapping.get(streamId); - if (!this._enableJsonResponse && stream?.controller && stream?.encoder) { - // For SSE responses, generate event ID if event store is provided - let eventId: string | undefined; + let eventId: string | undefined; + if (!this._enableJsonResponse && this._eventStore) { + // Persist request-scoped SSE events even while the controller is + // temporarily disconnected so they can be replayed on reconnect. + eventId = await this._eventStore.storeEvent(streamId, message); + } - if (this._eventStore) { - eventId = await this._eventStore.storeEvent(streamId, message); - } - // Write the event to the response stream + if (!this._enableJsonResponse && stream?.controller && stream?.encoder) { + // Write the event to the active response stream when available. this.writeSSEEvent(stream.controller, stream.encoder, message, eventId); } @@ -1004,10 +1005,10 @@ export class WebStandardStreamableHTTPServerTransport implements Transport { const allResponsesReady = relatedIds.every(id => this._requestResponseMap.has(id)); if (allResponsesReady) { - if (!stream) { - throw new Error(`No connection established for request ID: ${String(requestId)}`); - } - if (this._enableJsonResponse && stream.resolveJson) { + if (this._enableJsonResponse) { + if (!stream?.resolveJson) { + throw new Error(`No connection established for request ID: ${String(requestId)}`); + } // All responses ready, send as JSON const headers: Record = { 'Content-Type': 'application/json' @@ -1023,9 +1024,11 @@ export class WebStandardStreamableHTTPServerTransport implements Transport { } else { stream.resolveJson(Response.json(responses, { status: 200, headers })); } - } else { + } else if (stream) { // End the SSE stream stream.cleanup(); + } else if (!this._eventStore) { + throw new Error(`No connection established for request ID: ${String(requestId)}`); } // Clean up for (const id of relatedIds) { diff --git a/packages/server/test/server/streamableHttp.test.ts b/packages/server/test/server/streamableHttp.test.ts index 7a23dd56bb..60ee6f9cb4 100644 --- a/packages/server/test/server/streamableHttp.test.ts +++ b/packages/server/test/server/streamableHttp.test.ts @@ -705,6 +705,74 @@ describe('Zod v4', () => { // Should have id: field in the SSE event expect(text).toContain('id:'); }); + + it('should replay a terminal POST SSE response after closeSSEStream disconnects the request stream', async () => { + sessionId = await initializeServer(); + + mcpServer.registerTool('close-and-complete', { description: 'Disconnects request SSE and completes later' }, async ctx => { + ctx.http?.closeSSE?.(); + return { + content: [{ type: 'text', text: 'Done after reconnect' }] + }; + }); + + const request = createRequest( + 'POST', + { + jsonrpc: '2.0', + id: 'tool-close-1', + method: 'tools/call', + params: { name: 'close-and-complete', arguments: {} } + } as JSONRPCMessage, + { sessionId } + ); + const response = await transport.handleRequest(request); + + expect(response.status).toBe(200); + + const reader = response.body?.getReader(); + const { value } = await reader!.read(); + const text = new TextDecoder().decode(value); + const idMatch = text.match(/id: ([^\n]+)/); + expect(idMatch).toBeTruthy(); + const lastEventId = idMatch![1]!; + + const closedRead = reader!.read(); + const closedTimeout = new Promise<{ done: boolean; value: undefined }>((_, reject) => + setTimeout(() => reject(new Error('POST SSE stream did not close in time')), 1000) + ); + const { done } = await Promise.race([closedRead, closedTimeout]); + expect(done).toBe(true); + + const reconnectResponse = await transport.handleRequest( + createRequest('GET', undefined, { + sessionId, + extraHeaders: { + 'last-event-id': lastEventId + } + }) + ); + expect(reconnectResponse.status).toBe(200); + + const reconnectReader = reconnectResponse.body?.getReader(); + let replayedText = ''; + const replayTimeout = setTimeout(() => reconnectReader!.cancel(), 2000); + try { + while (!replayedText.includes('Done after reconnect')) { + const { value, done } = await reconnectReader!.read(); + if (done) break; + replayedText += new TextDecoder().decode(value); + } + } finally { + clearTimeout(replayTimeout); + } + + expect(replayedText).toContain('Done after reconnect'); + expect(parseSSEData(replayedText)).toMatchObject({ + jsonrpc: '2.0', + id: 'tool-close-1' + }); + }); }); describe('HTTPServerTransport - Protocol Version Validation', () => {