Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions packages/server/src/server/streamableHttp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,14 @@ export class WebStandardStreamableHTTPServerTransport implements Transport {

const encoder = new TextEncoder();
let streamController: ReadableStreamDefaultController<Uint8Array>;
let keepAliveInterval: ReturnType<typeof setInterval> | undefined;

const clearKeepAlive = () => {
if (keepAliveInterval !== undefined) {
clearInterval(keepAliveInterval);
keepAliveInterval = undefined;
}
};

// Create a ReadableStream with a controller we can use to push SSE events
const readable = new ReadableStream<Uint8Array>({
Expand All @@ -446,6 +454,7 @@ export class WebStandardStreamableHTTPServerTransport implements Transport {
},
cancel: () => {
// Stream was cancelled by client
clearKeepAlive();
this._streamMapping.delete(this._standaloneSseStreamId);
}
});
Expand All @@ -466,6 +475,7 @@ export class WebStandardStreamableHTTPServerTransport implements Transport {
controller: streamController!,
encoder,
cleanup: () => {
clearKeepAlive();
this._streamMapping.delete(this._standaloneSseStreamId);
try {
streamController!.close();
Expand All @@ -475,6 +485,14 @@ export class WebStandardStreamableHTTPServerTransport implements Transport {
}
});

keepAliveInterval = setInterval(() => {
try {
streamController!.enqueue(encoder.encode(': keep-alive\n\n'));
} catch {
clearKeepAlive();
}
}, 15_000);

return new Response(readable, { headers });
}

Expand Down
22 changes: 22 additions & 0 deletions packages/server/test/server/streamableHttp.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,28 @@ describe('Zod v4', () => {
expect(response.headers.get('mcp-session-id')).toBe(sessionId);
});

it('should send keep-alive comments on idle standalone SSE streams', async () => {
vi.useFakeTimers();
try {
sessionId = await initializeServer();

const request = createRequest('GET', undefined, { sessionId });
const response = await transport.handleRequest(request);
const reader = response.body!.getReader();
const read = reader.read();

await vi.advanceTimersByTimeAsync(15_000);

const { value, done } = await read;
expect(done).toBe(false);
expect(new TextDecoder().decode(value)).toBe(': keep-alive\n\n');

await reader.cancel();
} finally {
vi.useRealTimers();
}
});

it('should reject GET without Accept: text/event-stream', async () => {
sessionId = await initializeServer();

Expand Down
Loading