Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions docs/protocol.md
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,21 @@ _See [examples/server/distributed](../examples/server/distributed/main.go) for
an example using statless mode to implement a server distributed across
multiple processes._

#### Serverless Deployments

For serverless or short-lived processes, configure
[`StreamableHTTPOptions.SessionStateStore`](https://pkg.go.dev/github.com/modelcontextprotocol/go-sdk/mcp#StreamableHTTPOptions.SessionStateStore)
with an implementation of
[`ServerSessionStateStore`](https://pkg.go.dev/github.com/modelcontextprotocol/go-sdk/mcp#ServerSessionStateStore).
The handler will persist [`ServerSessionState`](https://pkg.go.dev/github.com/modelcontextprotocol/go-sdk/mcp#ServerSessionState)
whenever it changes, and will automatically restore prior state when a request
arrives carrying an existing `Mcp-Session-Id`. This allows one invocation to
handle initialization while subsequent invocations resume the conversation
without re-running a long-lived server. The SDK provides an in-memory
[`MemoryServerSessionStateStore`](https://pkg.go.dev/github.com/modelcontextprotocol/go-sdk/mcp#MemoryServerSessionStateStore)
for testing; production deployments should supply a durable store (for example,
backed by a database or object storage).

### Custom transports

The SDK supports [custom
Expand Down
15 changes: 15 additions & 0 deletions internal/docs/protocol.src.md
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,21 @@ _See [examples/server/distributed](../examples/server/distributed/main.go) for
an example using statless mode to implement a server distributed across
multiple processes._

#### Serverless Deployments

For serverless or short-lived processes, configure
[`StreamableHTTPOptions.SessionStateStore`](https://pkg.go.dev/github.com/modelcontextprotocol/go-sdk/mcp#StreamableHTTPOptions.SessionStateStore)
with an implementation of
[`ServerSessionStateStore`](https://pkg.go.dev/github.com/modelcontextprotocol/go-sdk/mcp#ServerSessionStateStore).
The handler will persist [`ServerSessionState`](https://pkg.go.dev/github.com/modelcontextprotocol/go-sdk/mcp#ServerSessionState)
whenever it changes, and will automatically restore prior state when a request
arrives carrying an existing `Mcp-Session-Id`. This allows one invocation to
handle initialization while subsequent invocations resume the conversation
without re-running a long-lived server. The SDK provides an in-memory
[`MemoryServerSessionStateStore`](https://pkg.go.dev/github.com/modelcontextprotocol/go-sdk/mcp#MemoryServerSessionStateStore)
for testing; production deployments should supply a durable store (for example,
backed by a database or object storage).

### Custom transports

The SDK supports [custom
Expand Down
91 changes: 91 additions & 0 deletions mcp/session_store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Copyright 2025 The Go MCP SDK Authors. All rights reserved.
// Use of this source code is governed by an MIT-style
// license that can be found in the LICENSE file.

package mcp

import (
"context"
"encoding/json"
"fmt"
"sync"
)

// ServerSessionStateStore persists server session state across process
// restarts.
//
// Implementations must be safe for concurrent use.
type ServerSessionStateStore interface {
// Load returns the previously saved state for sessionID. A nil result
// indicates that no state is available.
Load(ctx context.Context, sessionID string) (*ServerSessionState, error)
// Save persists the provided state. The state must not be modified after the
// call returns. Passing a nil state is equivalent to Delete.
Save(ctx context.Context, sessionID string, state *ServerSessionState) error
// Delete forgets any state associated with sessionID. This method must not
// return an error if no state is recorded.
Delete(ctx context.Context, sessionID string) error
}

// MemoryServerSessionStateStore is an in-memory implementation of
// ServerSessionStateStore.
//
// It is primarily intended for testing or simple deployments.
type MemoryServerSessionStateStore struct {
mu sync.RWMutex
states map[string][]byte
}

// NewMemoryServerSessionStateStore returns a MemoryServerSessionStateStore.
func NewMemoryServerSessionStateStore() *MemoryServerSessionStateStore {
return &MemoryServerSessionStateStore{
states: make(map[string][]byte),
}
}

// Load implements ServerSessionStateStore.
func (s *MemoryServerSessionStateStore) Load(ctx context.Context, sessionID string) (*ServerSessionState, error) {
if err := ctx.Err(); err != nil {
return nil, err
}
s.mu.RLock()
data, ok := s.states[sessionID]
s.mu.RUnlock()
if !ok {
return nil, nil
}
var state ServerSessionState
if err := json.Unmarshal(data, &state); err != nil {
return nil, fmt.Errorf("decode server session state: %w", err)
}
return &state, nil
}

// Save implements ServerSessionStateStore.
func (s *MemoryServerSessionStateStore) Save(ctx context.Context, sessionID string, state *ServerSessionState) error {
if err := ctx.Err(); err != nil {
return err
}
if state == nil {
return s.Delete(ctx, sessionID)
}
data, err := json.Marshal(state)
if err != nil {
return fmt.Errorf("encode server session state: %w", err)
}
s.mu.Lock()
defer s.mu.Unlock()
s.states[sessionID] = data
return nil
}

// Delete implements ServerSessionStateStore.
func (s *MemoryServerSessionStateStore) Delete(ctx context.Context, sessionID string) error {
if err := ctx.Err(); err != nil {
return err
}
s.mu.Lock()
delete(s.states, sessionID)
s.mu.Unlock()
return nil
}
80 changes: 69 additions & 11 deletions mcp/streamable.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,12 @@ type StreamableHTTPOptions struct {
//
// If SessionTimeout is the zero value, idle sessions are never closed.
SessionTimeout time.Duration

// SessionStateStore enables persisting session state across process
// restarts. When configured, the handler will attempt to restore server
// sessions whose identifiers are unknown to the current process, allowing
// serverless deployments that spin up per-request.
SessionStateStore ServerSessionStateStore
}

// NewStreamableHTTPHandler returns a new [StreamableHTTPHandler].
Expand Down Expand Up @@ -223,19 +229,38 @@ func (h *StreamableHTTPHandler) ServeHTTP(w http.ResponseWriter, req *http.Reque
return
}

logger := ensureLogger(h.opts.Logger)

sessionID := req.Header.Get(sessionIDHeader)
var sessInfo *sessionInfo
var (
sessInfo *sessionInfo
restoredState *ServerSessionState
)
if sessionID != "" {
h.mu.Lock()
sessInfo = h.sessions[sessionID]
h.mu.Unlock()
if sessInfo == nil && !h.opts.Stateless {
// Unless we're in 'stateless' mode, which doesn't perform any Session-ID
// validation, we require that the session ID matches a known session.
//
// In stateless mode, a temporary transport is be created below.
http.Error(w, "session not found", http.StatusNotFound)
return
if store := h.opts.SessionStateStore; store != nil {
state, err := store.Load(req.Context(), sessionID)
if err != nil {
logger.Error("session state load failed", "session_id", sessionID, "error", err)
http.Error(w, "failed to load session state", http.StatusInternalServerError)
return
}
restoredState = state
if state == nil {
http.Error(w, "session not found", http.StatusNotFound)
return
}
} else {
// Unless we're in 'stateless' mode, which doesn't perform any Session-ID
// validation, we require that the session ID matches a known session.
//
// In stateless mode, a temporary transport is be created below.
http.Error(w, "session not found", http.StatusNotFound)
return
}
}
}

Expand All @@ -248,6 +273,16 @@ func (h *StreamableHTTPHandler) ServeHTTP(w http.ResponseWriter, req *http.Reque
// Closing the session also removes it from h.sessions, due to the
// onClose callback.
sessInfo.session.Close()
} else if restoredState != nil {
// There is no running session, but persisted state exists. Delete it so
// that clients can terminate resumed sessions without an active server.
if store := h.opts.SessionStateStore; store != nil {
if err := store.Delete(req.Context(), sessionID); err != nil && !errors.Is(err, context.Canceled) {
logger.Error("session state delete failed", "session_id", sessionID, "error", err)
http.Error(w, "failed to delete session state", http.StatusInternalServerError)
return
}
}
}
w.WriteHeader(http.StatusNoContent)
return
Expand Down Expand Up @@ -322,14 +357,17 @@ func (h *StreamableHTTPHandler) ServeHTTP(w http.ResponseWriter, req *http.Reque
EventStore: h.opts.EventStore,
jsonResponse: h.opts.JSONResponse,
logger: h.opts.Logger,
StateStore: h.opts.SessionStateStore,
}

// Sessions without a session ID are also stateless: there's no way to
// address them.
stateless := h.opts.Stateless || sessionID == ""
// To support stateless mode, we initialize the session with a default
// state, so that it doesn't reject subsequent requests.
var connectOpts *ServerSessionOptions
connectOpts := &ServerSessionOptions{
State: restoredState,
}
if stateless {
// Peek at the body to see if it is initialize or initialized.
// We want those to be handled as usual.
Expand Down Expand Up @@ -374,13 +412,12 @@ func (h *StreamableHTTPHandler) ServeHTTP(w http.ResponseWriter, req *http.Reque
state.InitializedParams = new(InitializedParams)
}
state.LogLevel = "info"
connectOpts = &ServerSessionOptions{
State: state,
}
connectOpts.State = state
} else {
// Cleanup is only required in stateful mode, as transportation is
// not stored in the map otherwise.
connectOpts = &ServerSessionOptions{
State: restoredState,
onClose: func() {
h.mu.Lock()
defer h.mu.Unlock()
Expand All @@ -391,6 +428,11 @@ func (h *StreamableHTTPHandler) ServeHTTP(w http.ResponseWriter, req *http.Reque
h.onTransportDeletion(transport.SessionID)
}
}
if store := h.opts.SessionStateStore; store != nil && transport.SessionID != "" {
if err := store.Delete(context.Background(), transport.SessionID); err != nil {
logger.Error("session state delete failed", "session_id", transport.SessionID, "error", err)
}
}
},
}
}
Expand Down Expand Up @@ -487,6 +529,10 @@ type StreamableServerTransport struct {
// upon stream resumption.
EventStore EventStore

// StateStore receives session state updates so that callers can resume
// sessions across processes.
StateStore ServerSessionStateStore

// jsonResponse, if set, tells the server to prefer to respond to requests
// using application/json responses rather than text/event-stream.
//
Expand Down Expand Up @@ -519,6 +565,7 @@ func (t *StreamableServerTransport) Connect(ctx context.Context) (Connection, er
sessionID: t.SessionID,
stateless: t.Stateless,
eventStore: t.EventStore,
stateStore: t.StateStore,
jsonResponse: t.jsonResponse,
logger: ensureLogger(t.logger), // see #556: must be non-nil
incoming: make(chan jsonrpc.Message, 10),
Expand All @@ -543,6 +590,7 @@ type streamableServerConn struct {
stateless bool
jsonResponse bool
eventStore EventStore
stateStore ServerSessionStateStore

logger *slog.Logger

Expand Down Expand Up @@ -579,6 +627,16 @@ func (c *streamableServerConn) SessionID() string {
return c.sessionID
}

func (c *streamableServerConn) sessionUpdated(state ServerSessionState) {
if c.stateStore == nil || c.sessionID == "" || c.stateless {
return
}
stateCopy := state
if err := c.stateStore.Save(context.Background(), c.sessionID, &stateCopy); err != nil {
c.logger.Error("session state save failed", "session_id", c.sessionID, "error", err)
}
}

// A stream is a single logical stream of SSE events within a server session.
// A stream begins with a client request, or with a client GET that has
// no Last-Event-ID header.
Expand Down
Loading