Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions backend/selective-entry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -196,11 +196,7 @@ export default {
request.headers.get("upgrade") === "websocket"
) {
console.log("🔄 Routing to LiveStore sync handler on", request.url);
return syncHandler.fetch(
request as unknown as Request,
env,
ctx
) as unknown as WorkerResponse;
return syncHandler.fetch(request, env, ctx) as unknown as WorkerResponse;
}

// Route 3: API routes → Hono app
Expand Down
136 changes: 73 additions & 63 deletions backend/sync.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,55 @@
import {
makeDurableObject,
handleWebSocket,
} from "@livestore/sync-cf/cf-worker";
import { type Env, type ExecutionContext } from "./types";
import { makeDurableObject, makeWorker } from "@livestore/sync-cf/cf-worker";

import { getValidatedUser } from "./auth";
import { Schema } from "@runtimed/schema";

export class WebSocketServer extends makeDurableObject({
onPush: async (message) => {
console.log("onPush", message.batch);
onPush: async (message, { payload, storeId }) => {
try {
const decodedPayload = decodePayload(payload);
console.log("📝 Push received:", {
storeId,
eventCount: message.batch.length,
hasPayload: !!payload,
});

if (!decodedPayload.authToken) {
throw new Error("AuthToken is required");
}

if (decodedPayload?.runtime === true) {
console.log("📝 Runtime agent push:", {
runtimeId: decodedPayload.runtimeId,
storeId,
eventCount: message.batch.length,
});
} else {
console.log("📝 User push:", {
storeId,
eventCount: message.batch.length,
});
}
} catch (error: any) {
console.error("🚫 Push authentication failed:", error.message);
throw error;
}
},
onPull: async (message) => {
console.log("onPull", message);
onPull: async (_message, { payload, storeId }) => {
try {
const decodedPayload = decodePayload(payload);

console.log("📝 Pull request:", {
storeId,
isRuntime: decodedPayload?.runtime === true,
hasPayload: !!payload,
});

if (!decodedPayload.authToken) {
throw new Error("AuthToken is required");
}
} catch (error: any) {
console.error("🚫 Pull validation failed:", error.message);
throw error;
}
},
}) {}

Expand All @@ -39,60 +76,33 @@ const SyncPayloadSchema = Schema.Union(

const decodePayload = Schema.decodeUnknownSync(SyncPayloadSchema);

export default {
fetch: async (request: Request, env: Env, ctx: ExecutionContext) => {
const url = new URL(request.url);

const pathname = url.pathname;
export default makeWorker({
syncBackendBinding: "WEBSOCKET_SERVER",
validatePayload: async (rawPayload: unknown, { storeId }) => {
try {
const payload = decodePayload(rawPayload);

if (!pathname.startsWith("/livestore")) {
return new Response("Invalid request", { status: 400 });
}

return handleWebSocket(request, env, ctx, {
validatePayload: async (rawPayload) => {
try {
const payload = decodePayload(rawPayload);
let validatedUser = await getValidatedUser(payload.authToken, env);

if (!validatedUser) {
throw new Error("User must be authenticated");
}

// User identity is validated via JWT token
// LiveStore will manage clientId for device/app instance identification
if (payload?.runtime === true) {
// For runtime agents with full payload
console.log("✅ Runtime agent authenticated:", {
runtimeId: payload.runtimeId,
sessionId: payload.sessionId,
userId: payload.userId,
validatedUserId: validatedUser.id,
});
if (!payload.authToken || typeof payload.authToken !== "string") {
throw new Error("Valid authToken is required");
}

// Verify that the runtime's claimed userId matches the authenticated user
if (payload.userId !== validatedUser.id) {
throw new Error(
`Runtime userId mismatch: payload claims ${payload.userId}, but token is for ${validatedUser.id}`
);
}
} else {
// For regular users
console.log("✅ Authenticated user:", {
userId: validatedUser.id,
});
}

// SECURITY NOTE: This validation only occurs at connection time.
// The current version of `@livestore/sync-cf` does not provide a mechanism
// to verify that the `clientId` on incoming events matches the `clientId`
// that was validated with this initial connection payload. A malicious
// client could pass this check and then send events with a different clientId.
} catch (error: any) {
console.error("🚫 Authentication failed:", error.message);
throw error; // Reject the WebSocket connection
if (payload?.runtime === true) {
if (!payload.runtimeId || !payload.sessionId || !payload.userId) {
throw new Error(
"Runtime agents require runtimeId, sessionId, and userId"
);
}
},
});
console.log(
"📝 Runtime agent payload structure valid for store:",
storeId
);
} else {
console.log("📝 User payload structure valid for store:", storeId);
}
} catch (error: any) {
console.error("🚫 Payload validation failed:", error.message);
throw error;
}
},
};
enableCORS: true,
});
2 changes: 1 addition & 1 deletion backend/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ export type Env = {
SERVICE_PROVIDER?: string; // "local" | "anaconda"

// Bindings from the original sync worker configuration
WEBSOCKET_SERVER: DurableObjectNamespace;
SYNC_BACKEND_DO: DurableObjectNamespace;
DB: D1Database;

// Secrets
Expand Down
27 changes: 10 additions & 17 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,12 @@
"@japikey/cloudflare": "^0.4.0",
"@japikey/japikey": "^0.4.0",
"@japikey/shared": "^0.4.0",
"@livestore/adapter-web": "^0.3.1",
"@livestore/livestore": "^0.3.1",
"@livestore/react": "^0.3.1",
"@livestore/sync-cf": "^0.3.1",
"@livestore/wa-sqlite": "1.0.5-dev.2",
"@livestore/webmesh": "^0.3.1",
"@livestore/adapter-web": "v0.4.0-dev.10",
"@livestore/livestore": "v0.4.0-dev.10",
"@livestore/react": "v0.4.0-dev.10",
"@livestore/sync-cf": "v0.4.0-dev.10",
"@livestore/wa-sqlite": "v0.4.0-dev.10",
"@livestore/webmesh": "v0.4.0-dev.10",
"@microlink/react-json-view": "^1.26.2",
"@overengineering/fps-meter": "^0.1.2",
"@radix-ui/react-avatar": "^1.1.10",
Expand Down Expand Up @@ -99,7 +99,7 @@
"class-variance-authority": "^0.7.1",
"clsx": "^2.1.1",
"date-fns": "^4.1.0",
"effect": "3.15.5",
"effect": "^3.18.4",
"fractional-indexing": "^3.2.0",
"geojson-map-fit-mercator": "^1.1.0",
"hono": "^4.9.1",
Expand Down Expand Up @@ -139,8 +139,8 @@
"@cloudflare/workers-types": "^4.20250813.0",
"@effect/vitest": "0.23.7",
"@eslint/js": "^9.30.1",
"@livestore/adapter-node": "^0.3.1",
"@livestore/devtools-vite": "^0.3.1",
"@livestore/adapter-node": "v0.4.0-dev.10",
"@livestore/devtools-vite": "v0.4.0-dev.10",
"@tailwindcss/cli": "^4.1.10",
"@tailwindcss/postcss": "^4.1.10",
"@tailwindcss/typography": "^0.5.16",
Expand Down Expand Up @@ -178,15 +178,8 @@
},
"pnpm": {
"overrides": {
"effect": "3.15.5",
"react": "19.0.0",
"react-dom": "19.0.0",
"@effect/platform": "0.82.4",
"@effect/typeclass": "0.34.2",
"@effect/cluster": "0.34.2",
"@effect/experimental": "0.46.8",
"@effect/sql": "0.35.8",
"@effect/rpc": "0.59.9"
"react-dom": "19.0.0"
},
"onlyBuiltDependencies": [
"@parcel/watcher",
Expand Down
3 changes: 1 addition & 2 deletions packages/agent-core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@
"format:check": "prettier --check ."
},
"dependencies": {
"@livestore/livestore": "^0.3.1",
"effect": "3.15.5",
"@livestore/livestore": "v0.4.0-dev.10",
"zod": "^4.0.17",
"@opentelemetry/api": "^1.9.0"
},
Expand Down
44 changes: 39 additions & 5 deletions packages/agent-core/src/store-factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,44 @@ export async function createStorePromise(
): Promise<Store> {
const { adapter, notebookId, syncPayload } = config;

return await createLiveStorePromise({
adapter,
schema,
storeId: notebookId,
syncPayload: syncPayload as any,
console.log(`🏭 Store Factory: Creating store for notebook:`, {
notebookId,
hasSyncPayload: !!syncPayload,
syncPayloadKeys: syncPayload ? Object.keys(syncPayload) : [],
timestamp: new Date().toISOString(),
});

try {
const store = await createLiveStorePromise({
adapter,
schema,
storeId: notebookId,
syncPayload: syncPayload as any,
});

console.log(`✅ Store Factory: Successfully created store for notebook:`, {
notebookId,
storeExists: !!store,
timestamp: new Date().toISOString(),
});

// Add debugging helpers to the store instance
(store as any)._debugInfo = {
notebookId,
createdAt: new Date().toISOString(),
syncPayload: syncPayload
? { ...syncPayload, authToken: "[REDACTED]" }
: null,
};

return store;
} catch (error: any) {
console.error(`❌ Store Factory: Failed to create store for notebook:`, {
notebookId,
error: error.message,
stack: error.stack,
timestamp: new Date().toISOString(),
});
throw error;
}
}
3 changes: 1 addition & 2 deletions packages/ai-core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@
"format:check": "prettier --check ."
},
"dependencies": {
"@livestore/livestore": "^0.3.1",
"@livestore/livestore": "v0.4.0-dev.10",
"@opentelemetry/api": "^1.9.0",
"effect": "3.15.5",
"ollama": "^0.5.18",
"openai": "^5.22.0",
"zod": "^4.0.17"
Expand Down
5 changes: 3 additions & 2 deletions packages/pyodide-runtime/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -479,8 +479,9 @@ export class PyodideRuntimeAgent extends LocalRuntimeAgent {
currentCellRef
);

const maxAiIterations: number =
parseInt(this.agent.store.query(maxAiIterations$)) || 10;
const maxAiIterations: number = parseInt(
this.agent.store.query(maxAiIterations$) || "10"
);

// Track AI execution for cancellation
const aiAbortController = new AbortController();
Expand Down
3 changes: 1 addition & 2 deletions packages/schema/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@
"format:check": "prettier --check ."
},
"dependencies": {
"@livestore/livestore": "^0.3.1",
"effect": "3.15.5",
"@livestore/livestore": "v0.4.0-dev.10",
"zod": "^4.0.17"
},
"devDependencies": {
Expand Down
2 changes: 1 addition & 1 deletion packages/schema/src/queries/ai.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,5 @@ export const maxAiIterations$ = queryDb(
tables.notebookMetadata
.select("value")
.where("key", "=", "max_ai_iterations")
.first({ fallback: () => "10" })
.first({ behaviour: "fallback", fallback: () => "10" })
);
10 changes: 5 additions & 5 deletions packages/schema/src/queries/cellOrdering.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ export const firstCell$ = queryDb(
tables.cells
.select("id", "fractionalIndex")
.orderBy("fractionalIndex", "asc")
.first({ fallback: () => null }),
.first({ behaviour: "fallback", fallback: () => null }),
{ label: "cells.first" }
);

Expand All @@ -37,7 +37,7 @@ export const lastCell$ = queryDb(
tables.cells
.select("id", "fractionalIndex")
.orderBy("fractionalIndex", "desc")
.first({ fallback: () => null }),
.first({ behaviour: "fallback", fallback: () => null }),
{ label: "cells.last" }
);

Expand Down Expand Up @@ -88,7 +88,7 @@ export const getAdjacentCells = (cellId: string, fractionalIndex: string) => {
.select("id", "fractionalIndex")
.where("fractionalIndex", "<", fractionalIndex)
.orderBy("fractionalIndex", "desc")
.first({ fallback: () => null }),
.first({ behaviour: "fallback", fallback: () => null }),
{
deps: [cellId, fractionalIndex],
label: `cells.previous.${cellId}`,
Expand All @@ -100,7 +100,7 @@ export const getAdjacentCells = (cellId: string, fractionalIndex: string) => {
.select("id", "fractionalIndex")
.where("fractionalIndex", ">", fractionalIndex)
.orderBy("fractionalIndex", "asc")
.first({ fallback: () => null }),
.first({ behaviour: "fallback", fallback: () => null }),
{
deps: [cellId, fractionalIndex],
label: `cells.next.${cellId}`,
Expand All @@ -116,7 +116,7 @@ export const cellPositionInfo = (cellId: string) =>
tables.cells
.select("id", "fractionalIndex")
.where({ id: cellId })
.first({ fallback: () => null }),
.first({ behaviour: "fallback", fallback: () => null }),
{
deps: [cellId],
label: `cells.positionInfo.${cellId}`,
Expand Down
2 changes: 2 additions & 0 deletions packages/schema/src/queries/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ export const cellFractionalIndex = (cellId: string) =>
.select("fractionalIndex")
.where({ id: cellId })
.first({
behaviour: "fallback",
fallback: () => null,
}),
{
Expand All @@ -50,6 +51,7 @@ export const cellQuery = {
.select()
.where({ id: cellId })
.first({
behaviour: "fallback",
fallback: () => null,
}),
{
Expand Down
Loading