Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions packages/durable-session-proxy/src/protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import { DurableStream } from '@durable-streams/client'
import {
sessionStateSchema,
createSessionDB,
createMessagesPipeline,
createMessagesCollection,
createModelMessagesCollection,
} from '@electric-sql/durable-session'
import type { StreamChunk, AgentSpec, ProxySessionState, AIDBProtocolOptions } from './types'
Expand Down Expand Up @@ -212,9 +212,8 @@ export class AIDBSessionProtocol {
// After this, all historical messages are in the collections
await sessionDB.preload()

// Create the messages pipeline
const { messages } = createMessagesPipeline({
sessionId,
// Create the messages collection from chunks
const messages = createMessagesCollection({
chunksCollection: sessionDB.collections.chunks,
})

Expand Down Expand Up @@ -391,7 +390,7 @@ export class AIDBSessionProtocol {
actorId,
role: 'user' as const,
chunk: JSON.stringify({
type: 'user-message',
type: 'whole-message',
message,
}),
seq: 0,
Expand Down
201 changes: 78 additions & 123 deletions packages/durable-session/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,13 @@
* Provides TanStack AI-compatible API backed by Durable Streams
* with real-time sync and multi-agent support.
*
* All derived collections contain fully materialized objects.
* No helper functions needed to access data.
* All derived collections contain fully materialized MessageRow objects.
* Consumers filter message.parts to access specific part types (ToolCallPart, etc.).
*/

import { createCollection, createOptimisticAction } from '@tanstack/db'
import type { Transaction } from '@tanstack/db'
import type { UIMessage, AnyClientTool } from '@tanstack/ai'
import type { ChunkRow } from './schema'
import type { UIMessage, AnyClientTool, ToolCallPart } from '@tanstack/ai'
import type {
DurableChatClientOptions,
MessageRow,
Expand All @@ -25,22 +24,20 @@ import type {
ApprovalResponseInput,
ActorType,
} from './types'
import { createSessionDB, getChunkKey, type SessionDB } from './collection'
import { createSessionDB, type SessionDB } from './collection'
import {
createCollectedMessagesCollection,
createMessagesCollection,
createToolCallsCollection,
createPendingApprovalsCollection,
createToolResultsCollection,
createApprovalsCollection,
createActiveGenerationsCollection,
createSessionMetaCollectionOptions,
createSessionStatsCollection,
createPresenceCollection,
createInitialSessionMeta,
updateConnectionStatus,
updateSyncProgress,
} from './collections'
import { extractTextContent } from './materialize'
import { extractTextContent, messageRowToUIMessage } from './materialize'

/**
* Unified input for all message optimistic actions.
Expand Down Expand Up @@ -104,8 +101,7 @@ export class DurableChatClient<

// Collections are typed via inference from createCollections()
// Created synchronously in constructor - always available
private readonly _collections: ReturnType<DurableChatClient['createCollections']>['collections']
private readonly _collectedMessages: ReturnType<DurableChatClient['createCollections']>['collectedMessages']
private readonly _collections: ReturnType<DurableChatClient['createCollections']>

private _isConnected = false
private _isPaused = false
Expand Down Expand Up @@ -146,9 +142,7 @@ export class DurableChatClient<
})

// Create all collections synchronously (always from _db.collections)
const { collections, collectedMessages } = this.createCollections()
this._collections = collections
this._collectedMessages = collectedMessages
this._collections = this.createCollections()

// Initialize session metadata
this._collections.sessionMeta.insert(
Expand All @@ -168,8 +162,9 @@ export class DurableChatClient<
/**
* Create all derived collections from the chunks collection.
*
* This implements the live query pipeline pattern:
* chunks → collectedMessages → messages (and other derived collections)
* Pipeline architecture:
* - chunks → (subquery) → messages (root materialized collection)
* - Derived collections filter messages via .fn.where() on parts
*
* CRITICAL: Materialization happens inside fn.select(). No imperative code
* outside this pattern.
Expand All @@ -179,50 +174,37 @@ export class DurableChatClient<
// Note: rawPresence contains per-device records; we expose aggregated presence
const { chunks, presence: rawPresence, agents } = this._db.collections

// Stage 1: Create collected messages (intermediate - groups by messageId)
const collectedMessages = createCollectedMessagesCollection({
sessionId: this.sessionId,
chunksCollection: chunks,
})

// Stage 2: Create materialized messages collection
// Root materialized collection: chunks → messages
// Uses inline subquery for chunk aggregation
const messages = createMessagesCollection({
sessionId: this.sessionId,
collectedMessagesCollection: collectedMessages,
chunksCollection: chunks,
})

// Derive tool calls from collected messages
// Derived collections filter on message parts (lazy evaluation)
const toolCalls = createToolCallsCollection({
sessionId: this.sessionId,
collectedMessagesCollection: collectedMessages,
messagesCollection: messages,
})

// Derive tool results from collected messages
const toolResults = createToolResultsCollection({
sessionId: this.sessionId,
collectedMessagesCollection: collectedMessages,
const pendingApprovals = createPendingApprovalsCollection({
messagesCollection: messages,
})

// Derive approvals from collected messages
const approvals = createApprovalsCollection({
sessionId: this.sessionId,
collectedMessagesCollection: collectedMessages,
const toolResults = createToolResultsCollection({
messagesCollection: messages,
})

// Derive active generations from messages
const activeGenerations = createActiveGenerationsCollection({
sessionId: this.sessionId,
messagesCollection: messages,
})

// Create session metadata collection (local state)
// Session metadata collection (local state)
const sessionMeta = createCollection(
createSessionMetaCollectionOptions({
sessionId: this.sessionId,
})
)

// Create session statistics collection (derived from chunks)
// Session statistics collection (aggregated from chunks)
const sessionStats = createSessionStatsCollection({
sessionId: this.sessionId,
chunksCollection: chunks,
Expand All @@ -236,19 +218,16 @@ export class DurableChatClient<
})

return {
collections: {
chunks,
presence,
agents,
messages,
toolCalls,
toolResults,
approvals,
activeGenerations,
sessionMeta,
sessionStats,
},
collectedMessages,
chunks,
presence,
agents,
messages,
toolCalls,
pendingApprovals,
toolResults,
activeGenerations,
sessionMeta,
sessionStats,
}
}

Expand All @@ -261,10 +240,7 @@ export class DurableChatClient<
* Messages are accessed directly from the materialized collection.
*/
get messages(): UIMessage[] {
// Convert MessageRow to UIMessage
return [...this._collections.messages.values()].map((row) =>
this.messageRowToUIMessage(row)
)
return [...this._collections.messages.values()].map(messageRowToUIMessage)
}

/**
Expand Down Expand Up @@ -390,42 +366,24 @@ export class DurableChatClient<
* Create the unified optimistic action for all message types.
* Handles user, assistant, and system messages with the same pattern.
*
* IMPORTANT: We insert into the chunks collection (not the messages collection)
* because messages is a derived collection from a live query pipeline. Inserting
* directly into a derived collection causes TanStack DB reconciliation bugs where
* synced data becomes invisible while the optimistic mutation is pending.
*
* By inserting into the chunks collection with the user-message format, the
* optimistic row flows through the normal pipeline: chunks → collectedMessages → messages.
* Optimistic updates insert into the messages collection directly.
* This ensures the optimistic state propagates to all derived collections
* (toolCalls, pendingApprovals, toolResults, activeGenerations).
*/
private createMessageAction() {
return createOptimisticAction<MessageActionInput>({
onMutate: ({ content, messageId, role }) => {
// For optimistic inserts, we use seq=0 since user messages are single-chunk.
// The key format is `${messageId}:${seq}`.
const seq = 0
const id = getChunkKey(messageId, seq)

const createdAt = new Date()

// Insert into chunks collection with user-message format.
// This flows through the live query pipeline: chunks → collectedMessages → messages
this._collections.chunks.insert({
id,
messageId,
actorId: this.actorId,
// Insert into messages collection directly
// This propagates to all derived collections
this._collections.messages.insert({
id: messageId,
role,
chunk: JSON.stringify({
type: 'user-message',
message: {
id: messageId,
role,
parts: [{ type: 'text' as const, content }],
createdAt: createdAt.toISOString(),
},
}),
createdAt: createdAt.toISOString(),
seq,
parts: [{ type: 'text' as const, content }],
actorId: this.actorId,
isComplete: true,
createdAt,
})
},
mutationFn: async ({ content, messageId, role, agent }) => {
Expand Down Expand Up @@ -536,21 +494,28 @@ export class DurableChatClient<
/**
* Create the optimistic action for adding tool results.
*
* Uses client-generated messageId for predictable tool result IDs,
* enabling proper optimistic updates.
* Inserts a new message with a ToolResultPart into the messages collection.
* Uses client-generated messageId for predictable IDs.
*/
private createAddToolResultAction() {
return createOptimisticAction<ClientToolResultInput>({
onMutate: ({ messageId, toolCallId, output, error }) => {
const resultId = `${messageId}:${toolCallId}`
this._collections.toolResults.insert({
id: resultId,
toolCallId,
messageId,
output,
error: error ?? null,
const createdAt = new Date()

// Insert a new message with tool-result part
this._collections.messages.insert({
id: messageId,
role: 'assistant',
parts: [{
type: 'tool-result' as const,
toolCallId,
content: typeof output === 'string' ? output : JSON.stringify(output),
state: error ? 'error' as const : 'complete' as const,
...(error && { error }),
}],
actorId: this.actorId,
createdAt: new Date(),
isComplete: true,
createdAt,
})
},
mutationFn: async ({ messageId, toolCallId, output, error }) => {
Expand Down Expand Up @@ -586,21 +551,27 @@ export class DurableChatClient<
/**
* Create the optimistic action for approval responses.
*
* Note: We use optimistic updates for approvals since we're updating
* an existing row (not inserting). The approval ID is known client-side.
* The optimistic update provides instant feedback while the server
* processes the response.
* Finds the message containing the tool call with the approval and updates
* the approval.approved field. This propagates to pendingApprovals collection.
*/
private createApprovalResponseAction() {
return createOptimisticAction<ApprovalResponseInput>({
onMutate: ({ id, approved }) => {
const approval = this._collections.approvals.get(id)
if (approval) {
this._collections.approvals.update(id, (draft) => {
draft.status = approved ? 'approved' : 'denied'
draft.respondedBy = this.actorId
draft.respondedAt = new Date()
})
// Find the message containing this approval
for (const message of this._collections.messages.values()) {
for (const part of message.parts) {
if (part.type === 'tool-call' && part.approval?.id === id) {
// Update the message with the approval response
this._collections.messages.update(message.id, (draft) => {
for (const p of draft.parts) {
if (p.type === 'tool-call' && p.approval?.id === id) {
(p as ToolCallPart).approval!.approved = approved
}
}
})
return
}
}
}
},
mutationFn: async ({ id, approved }) => {
Expand Down Expand Up @@ -825,22 +796,6 @@ export class DurableChatClient<
// Private Helpers
// ═══════════════════════════════════════════════════════════════════════

/**
* Convert MessageRow to UIMessage.
*
* Includes actorId for avatar display (agent ID for assistant messages,
* user ID for user messages).
*/
private messageRowToUIMessage(row: MessageRow): UIMessage & { actorId: string } {
return {
id: row.id,
role: row.role as 'user' | 'assistant',
parts: row.parts,
createdAt: row.createdAt,
actorId: row.actorId,
}
}

/**
* Update session metadata.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ import type { MessageRow, ActiveGenerationRow } from '../types'
* Options for creating an active generations collection.
*/
export interface ActiveGenerationsCollectionOptions {
/** Session identifier */
sessionId: string
/** Messages collection to derive from */
messagesCollection: Collection<MessageRow>
}
Expand Down Expand Up @@ -72,14 +70,12 @@ export function createActiveGenerationsCollection(

// Filter messages for incomplete ones and transform to ActiveGenerationRow
// Order by createdAt to ensure chronological ordering
// startSync: true ensures the collection starts syncing immediately.
return createLiveQueryCollection({
query: (q) =>
q
.from({ message: messagesCollection })
.orderBy(({ message }) => message.createdAt, 'asc')
.fn.where(({ message }) => !message.isComplete)
.fn.select(({ message }) => messageToActiveGeneration(message)),
startSync: true,
})
}
Loading