diff --git a/RFC-persisted-sync-metadata.md b/RFC-persisted-sync-metadata.md new file mode 100644 index 000000000..d853aab0e --- /dev/null +++ b/RFC-persisted-sync-metadata.md @@ -0,0 +1,900 @@ +# RFC: Transactional Persisted Sync Metadata + +## Status + +Draft + +## Summary + +This RFC proposes a transactional metadata API that sync implementations can +optionally use to persist and restore metadata alongside synced collection data. + +The design supports two metadata scopes: + +- **Row metadata**: metadata attached to a specific synced row +- **Collection metadata**: metadata attached to the collection as a whole + +The API is designed so that metadata changes can be committed atomically with +persisted row changes. This is required for correctness in two cases that are +already visible in the codebase: + +- `query-db-collection` needs persisted ownership and GC state so warm-starts do + not incorrectly delete or leak rows +- `electric-db-collection` needs persisted resume state and related metadata so + it can safely warm-start from persisted data and continue streaming + +This RFC is intentionally ordered around the consumer-facing API first, then the +SQLite implementation, then how query collections use it, and finally how +Electric collections use it. + +## Problem + +Today, persisted SQLite rows and sync-layer runtime metadata live on different +planes: + +- persisted collections store row values durably +- sync implementations keep important state in memory only + +That leads to restart gaps: + +- query collections lose row ownership state and cannot safely decide whether a + row should be deleted when the first query result arrives after restart +- Electric collections do not have a durable, transactional place to store + stream resume state such as offsets or handles + +The central requirement is not merely "persist metadata", but: + +1. collections must be able to **read persisted metadata on startup** +2. collections must be able to **update metadata as part of normal sync work** +3. persisted metadata that affects row existence must be **transactional with + row persistence** + +Non-transactional sidecar metadata is not sufficient for correctness. If row +data commits without matching metadata, or metadata commits without matching row +data, restart behavior can still be wrong. + +## Goals + +- Provide an optional metadata API to sync implementations +- Keep the API generic enough for multiple sync implementations +- Preserve crash consistency by making metadata transactional with row changes +- Support both row-local and collection-level metadata +- Support persisted GC state for query collections +- Support persisted resume state for Electric collections + +## Non-Goals + +- Define every possible metadata schema for all sync implementations +- Require metadata support for non-persisted collections +- Force all persistence adapters to implement advanced GC optimizations on day + one + +## Proposed API + +### Design principles + +The API exposed to a collection's sync implementation should be: + +- **optional**: absent for non-persisted collections +- **transaction-scoped**: metadata mutations participate in the current sync + transaction +- **scope-aware**: row metadata and collection metadata are separate +- **readable at startup**: sync implementations can restore state before or + during hydration + +### Sync API additions + +The `sync.sync()` params gain an optional `metadata` capability: + +```ts +type SyncMetadataApi = { + row: { + get: (key: TKey) => unknown | undefined + set: (key: TKey, metadata: unknown) => void + delete: (key: TKey) => void + } + collection: { + get: (key: string) => unknown | undefined + set: (key: string, value: unknown) => void + delete: (key: string) => void + list: (prefix?: string) => ReadonlyArray<{ + key: string + value: unknown + }> + } +} + +type SyncParams = { + collection: Collection + begin: (options?: { immediate?: boolean }) => void + write: (message: ChangeMessageOrDeleteKeyMessage) => void + commit: () => void + markReady: () => void + truncate: () => void + metadata?: SyncMetadataApi +} +``` + +### Semantics + +`metadata` is only available when the collection is backed by a persistence +layer that supports it. + +`metadata.row.*` operates on the durable metadata associated with synced rows in +the current collection. + +`metadata.collection.*` operates on durable collection-scoped metadata entries. +These entries are not attached to a single row, but they still participate in +the current sync transaction. + +### Transaction model + +Metadata operations are only valid while a sync transaction is open, that is, +between `begin()` and `commit()`. + +This RFC explicitly requires support for four kinds of committed sync +transactions: + +- row mutations only +- row mutations plus metadata mutations +- collection metadata mutations only +- row metadata mutations only + +If `metadata.row.set`, `metadata.row.delete`, `metadata.collection.set`, or +`metadata.collection.delete` is called outside an open transaction, the +implementation should throw, just as `write()` does today when called without a +pending sync transaction. + +### Read-your-own-writes + +Reads performed through `metadata.row.get`, `metadata.collection.get`, and +`metadata.collection.list` inside an open transaction must reflect any staged +writes from that same transaction. + +This is required so sync implementations can safely merge metadata within a +transaction without having to mirror staged state themselves. + +The write semantics are: + +- `row.set` updates the metadata that will be committed for that row +- `row.delete` removes persisted row metadata for that row +- `collection.set` stages a collection metadata update in the current sync + transaction +- `collection.delete` stages a collection metadata delete in the current sync + transaction + +The read semantics are: + +- `row.get` returns the currently hydrated metadata for a row, if known +- `collection.get` and `collection.list` return the persisted collection + metadata that was loaded during startup or hydration + +### Relationship to `write({ metadata })` + +The existing `write({ type, value, metadata })` path and `metadata.row.*` must +target the same underlying row metadata store. + +They serve different purposes: + +- `write({ ..., metadata })` attaches metadata to a row mutation +- `metadata.row.set()` and `metadata.row.delete()` allow explicit metadata-only + row changes when the row value itself did not change + +Within a single transaction, implementations should treat these as staged +updates to the same row metadata slot. If both are used for the same row in the +same transaction, the effective metadata should follow transaction order +semantics, with later staged changes winning. + +### Why this shape + +This API is deliberately **not** an async sidecar KV API like +`load/store/delete`. A free-floating async store suggests independent writes at +arbitrary times. That is exactly what we want to avoid for correctness-sensitive +state. + +Instead, the API is modeled as an extension of the existing sync transaction +surface: + +- read previously persisted metadata +- stage metadata changes +- commit metadata together with row changes + +### Serialization + +Persisted metadata values are JSON-serialized using the same persisted JSON +encoding rules used elsewhere in the SQLite adapter. Metadata should therefore +be kept JSON-compatible and reasonably small. + +## SQLite Persistence Implementation + +### Overview + +The SQLite persisted collection layer implements the metadata API using two +durable stores: + +1. **row metadata** stored with persisted rows +2. **collection metadata** stored in a separate table + +Both participate in the same SQLite transaction used to apply a committed sync +transaction. + +### Schema changes + +#### Persisted rows + +Add a `metadata` column to the collection table: + +```sql +CREATE TABLE IF NOT EXISTS ( + key TEXT PRIMARY KEY NOT NULL, + value TEXT NOT NULL, + metadata TEXT, + row_version INTEGER NOT NULL +) +``` + +The tombstone table may also optionally carry the last row metadata if useful +for debugging or future recovery, but that is not required for the core design. + +#### Collection metadata + +Add a collection-level metadata table: + +```sql +CREATE TABLE IF NOT EXISTS collection_metadata ( + collection_id TEXT NOT NULL, + key TEXT NOT NULL, + value TEXT NOT NULL, + updated_at INTEGER NOT NULL, + PRIMARY KEY (collection_id, key) +) +``` + +This table stores collection-scoped metadata such as: + +- Electric resume state +- query collection placeholder GC state +- future sync-implementation-specific metadata + +### Adapter contract + +The SQLite adapter extends its persistence internals so a single committed sync +transaction can include: + +- row mutations +- row metadata mutations +- collection metadata mutations + +This requires the persisted runtime to stage metadata on the pending sync +transaction itself, not in a side buffer detached from `begin()` / `commit()`. + +One possible shape is: + +```ts +type PersistedRowMutation = + | { type: 'insert'; key: TKey; value: T; metadata?: unknown } + | { type: 'update'; key: TKey; value: T; metadata?: unknown } + | { type: 'delete'; key: TKey; value: T } + +type PersistedCollectionMetadataMutation = + | { type: 'set'; key: string; value: unknown } + | { type: 'delete'; key: string } + +type PersistedTx = { + txId: string + term: number + seq: number + rowVersion: number + mutations: Array> + collectionMetadataMutations?: Array +} +``` + +This preserves a crucial invariant: + +> if a sync transaction commits, both the row data and the metadata that explains +> that row data commit together + +### PersistenceAdapter changes + +This RFC implies an explicit adapter contract change: + +- persisted row hydration must be able to return row metadata +- persisted transaction application must be able to apply collection metadata + mutations as part of the same commit + +One possible updated hydration shape is: + +```ts +type PersistedLoadedRow = { + key: TKey + value: T + metadata?: unknown +} +``` + +Existing adapters that do not yet provide metadata can remain compatible by +returning rows with `metadata: undefined`. + +### Startup and hydration + +The persisted runtime loads: + +- row values and row metadata during normal subset hydration +- collection metadata during runtime startup + +This means metadata restoration does **not** require a separate full database +scan beyond what the collection was already going to hydrate. + +In eager mode, the initial hydrated subset carries its row metadata with it. + +In on-demand mode, metadata is restored lazily for whichever subsets are loaded. + +Collection metadata should be loaded before new sync subscriptions begin +processing, so startup GC or resume-state decisions can run against a stable +baseline. + +## Query Collection Usage + +### Problem to solve + +`query-db-collection` keeps ownership state in memory: + +- `queryToRows` +- `rowToQueries` +- `queryRefCounts` + +After restart, persisted rows are restored into the base collection, but query +ownership is lost. The first query result can then incorrectly delete rows that +were hydrated from persistence but not yet claimed in memory. + +### What the query collection should persist + +The query collection should persist two categories of state: + +1. **per-row ownership metadata** +2. **per-query GC state** + +### Row metadata shape + +Ownership should be stored in row metadata, not in a global sidecar blob: + +```ts +type QueryRowMetadata = { + queryCollection?: { + owners: Record + } +} +``` + +Where the `owners` keys are hashed query identities. + +This makes persisted ownership: + +- local to the row it explains +- transactional with the row write +- reconstructible during ordinary row hydration + +This also means ownership updates can happen without inventing synthetic row +value updates. A query may stop owning a row while another query still owns it; +that is a metadata-only row change. + +### Reconstructing in-memory state + +When rows are hydrated from persistence, the query collection can rebuild: + +- `rowToQueries` from each row's persisted `owners` +- `queryToRows` by reversing that mapping + +This reconstruction is incremental. It happens for the rows being hydrated, not +by requiring a separate full read of all persisted rows. + +In on-demand mode, that means the in-memory ownership graph is only complete for +the hydrated subsets. This is sufficient for warm-start correctness of loaded +data, but not by itself sufficient for storage-level GC over entirely cold rows. + +### Query refcounts + +`queryRefCounts` should remain in-memory only. + +They represent live subscriber/process state, not durable row ownership. After +restart, refcounts should begin at zero and grow as real subscriptions attach. + +### Query lifecycle controls + +Query collections now need three distinct lifecycle controls: + +- `staleTime`: freshness of query data when re-requested +- `gcTime`: in-memory observer and TanStack Query cache retention +- `persistedGcTime`: durable placeholder and persisted-row retention + +These controls solve different problems and must remain independent. + +`staleTime` answers: + +- should this query be considered stale when requested again? + +`gcTime` answers: + +- how long should the in-memory query observer and query cache survive after the + query becomes inactive? + +`persistedGcTime` answers: + +- how long should persisted ownership placeholders and persisted rows survive + after the query becomes inactive? + +This separation is required for offline-first users who want persisted query +results to survive long periods offline even after in-memory query GC has +occurred. + +### Persisted query retention state + +Warm-start correctness also requires persisted query retention state for query +placeholders that still own rows but currently have no active subscribers. + +That state is collection-level metadata and should support both finite TTL-based +retention and indefinite retention until the query is revalidated. + +```ts +type PersistedQueryRetentionEntry = + | { + queryHash: string + mode: 'ttl' + expiresAt: number + } + | { + queryHash: string + mode: 'until-revalidated' + } +``` + +Suggested keys: + +- `queryCollection:gc:` + +The value should contain at least: + +- either `expiresAt` for finite TTL retention +- or `mode: 'until-revalidated'` for indefinite persisted retention +- optionally debug fields like `lastActiveAt` + +The `until-revalidated` mode is intended for products that want persisted query +results to remain available indefinitely while offline and only be reconciled +once the same query is requested again. + +### Query identity + +The GC entry must be tied to the same canonical identity used for row ownership. + +If the query collection needs more than the hash for debugging or future +matching, it may also persist: + +- `queryCollection:query:` -> serialized query identity + +This is collection-scoped metadata, not row metadata. + +### GC behavior + +When a query becomes idle and would normally begin its GC countdown: + +1. keep row ownership on the rows +2. persist `queryCollection:gc:` with either: + - `mode: 'ttl'` and `expiresAt`, or + - `mode: 'until-revalidated'` + +On restart: + +1. load collection metadata entries matching `queryCollection:gc:` +2. for any query placeholder with `mode: 'ttl'` and expired `expiresAt`, run + persisted cleanup +3. skip startup GC for placeholders with `mode: 'until-revalidated'` +4. remove the placeholder's ownership from rows when cleanup runs +5. delete rows that no longer have owners +6. delete the GC metadata entry when cleanup completes + +Restart GC must run before new query subscriptions are allowed to attach for the +same collection, or under the same startup mutex that serializes hydration and +replay work. This avoids races where a placeholder is cleaned up while a real +query is simultaneously reattaching. + +When a query with `mode: 'until-revalidated'` is requested again: + +1. match the placeholder using the same canonical query identity +2. reconstruct the query's persisted ownership baseline +3. run the query and diff the result against the persisted owned rows +4. remove rows that are no longer owned after revalidation +5. clear or refresh the retention entry based on the newly active query state + +This gives the desired offline behavior: + +- persisted rows remain available indefinitely +- they are not deleted just because in-memory `gcTime` elapsed +- they are eventually reconciled when the query is re-requested + +### Persisted GC implementation strategies + +There are two viable implementation levels: + +#### Level 1: simple row-metadata rewrite + +Use row metadata as the source of truth and perform cleanup by: + +- loading affected rows +- removing the owner from row metadata +- deleting rows whose owner set becomes empty + +This is simpler and consistent with the row-metadata design, but it is less +efficient for large collections. + +Level 1 also has an important limitation: if the adapter cannot efficiently +enumerate rows owned by a query, cleanup may degrade into a full collection scan +and row-metadata JSON rewrite. That is acceptable as an initial correctness +implementation, but it should be treated as a potentially expensive path. + +This cost matters even more when persisted retention is long-lived, because more +query placeholders and retained rows may accumulate over time. + +#### Level 2: normalized ownership index + +Add an adapter-level ownership table: + +```sql +CREATE TABLE query_row_ownership ( + collection_id TEXT NOT NULL, + row_key TEXT NOT NULL, + query_hash TEXT NOT NULL, + PRIMARY KEY (collection_id, row_key, query_hash) +) +``` + +This allows persisted GC to run efficiently in SQLite without scanning or +rewriting every row blob. The row metadata can remain the logical API surface, +while the adapter maintains the normalized index as an optimization. + +This RFC does not require Level 2 for the initial API, but it leaves room for +it because query GC on persisted data is a first-class requirement. + +Another acceptable future variation is to denormalize owned row keys into the GC +entry itself. This RFC does not require that initially, but it is compatible +with the collection metadata model. + +### Query API surface + +The query collection should expose persisted retention separately from +`staleTime` and `gcTime`. + +One possible shape is: + +```ts +queryCollectionOptions({ + queryKey: ['messages', spaceId, pageId], + queryFn, + staleTime: 0, + gcTime: 5 * 60_000, + persistedGcTime: Infinity, +}) +``` + +An alternative shape that leaves more room for future extension is: + +```ts +queryCollectionOptions({ + queryKey: ['messages', spaceId, pageId], + queryFn, + staleTime: 0, + gcTime: 5 * 60_000, + persistedRetention: { + gcTime: Infinity, + }, +}) +``` + +This RFC does not require the final option name, but it does require persisted +retention to be distinct from the existing in-memory `gcTime`. + +## Electric Collection Usage + +### Problem to solve + +Electric has a different persistence problem from query ownership. + +It needs durable collection-level resume state so that after restart it can: + +- warm-start from persisted rows +- safely resume streaming from the correct point + +Today, Electric can hydrate row data from persistence, but it does not have a +dedicated transactional metadata path for persisted resume state. + +### What Electric should persist + +Electric should use both metadata scopes: + +#### Collection metadata + +Use collection metadata for stream resume state, for example: + +```ts +type ElectricResumeMetadata = + | { + kind: 'resume' + offset: string + handle: string + shapeId: string + updatedAt: number + } + | { + kind: 'reset' + updatedAt: number + } +``` + +Suggested key: + +- `electric:resume` + +This metadata must be committed transactionally with the row changes that were +applied from the same Electric stream batch. + +That gives the required safety property: + +- if the row batch commits, the resume state commits +- if the row batch does not commit, the resume state does not advance either + +#### Row metadata + +Electric already attaches sync metadata to rows from stream headers. That row +metadata should flow through the same row metadata API so it can survive restart +where useful. + +This includes information like: + +- relation identity +- other per-row sync headers that are useful after hydration + +### Resume semantics + +On startup, Electric should: + +1. read `electric:resume` from collection metadata +2. prefer that persisted resume state over a default `now` fallback +3. hydrate persisted rows +4. continue streaming from the persisted resume point + +### Interaction with derived in-memory state + +Electric also maintains in-memory derived state such as: + +- tag tracking for move-out handling +- synced key tracking +- snapshot and txid matching helpers + +This RFC does not require every derived Electric structure to become durable in +the first iteration. But it does define the metadata API needed to do so where +necessary. + +The practical rule is: + +- if a piece of Electric state affects whether rows should exist after restart, + it should eventually become durable, either as row metadata or collection + metadata +- if that state cannot yet be reconstructed safely, Electric should fall back to + a conservative reload path rather than assuming warm-started data is exact + +## API Usage Examples + +### Query collection example + +```ts +sync: ({ begin, write, commit, metadata }) => { + const setRowOwners = ( + rowKey: string | number, + owners: Record, + ) => { + const current = (metadata?.row.get(rowKey) ?? {}) as Record + metadata?.row.set(rowKey, { + ...current, + queryCollection: { + owners, + }, + }) + } + + begin() + // Normal sync logic... + commit() +} +``` + +### Electric example + +```ts +sync: ({ begin, write, commit, metadata }) => { + const resumeState = metadata?.collection.get('electric:resume') as + | { + kind: 'resume' + offset: string + handle: string + shapeId: string + updatedAt: number + } + | { + kind: 'reset' + updatedAt: number + } + | undefined + + // use resumeState to configure the stream + + // later, when committing a batch: + begin() + write({ type: 'update', value: row, metadata: rowHeaders }) + metadata?.collection.set('electric:resume', { + kind: 'resume', + offset: nextOffset, + handle: nextHandle, + shapeId: nextShapeId, + updatedAt: Date.now(), + }) + commit() +} +``` + +## Design Decisions + +### Why row metadata and collection metadata both exist + +They solve different problems: + +- row metadata explains why a specific row exists and what sync state belongs to + it +- collection metadata tracks collection-wide runtime state such as resume points + and query placeholder GC entries + +Trying to store everything in one global metadata blob would force unnecessary +bootstrap work and make transactional coupling harder. + +### Why metadata is part of the sync transaction model + +The metadata API is not just a convenience wrapper. It is part of the sync +transaction model. + +That means implementations must stage row operations, row metadata mutations, +and collection metadata mutations on the same pending sync transaction and apply +them together during commit. + +### Why query GC state is collection metadata + +GC timers are properties of query placeholders, not of individual rows. They +must persist across restart, but they are not naturally attached to a specific +row. + +The ownership edges themselves belong with rows, but the expiration state belongs +with the query placeholder. + +This also allows persisted retention to express policies that are not ordinary +timers, such as `until-revalidated`. + +### Why refcounts are not persisted + +Live refcounts describe current subscribers and current process state. That +state is not durable and should not survive restart. Durable ownership and +placeholder GC state are enough to reconstruct the correct baseline. + +### Why persisted retention is separate from `gcTime` + +Products may want in-memory query state to be short-lived while persisted data +remains durable for much longer, including indefinitely until the query is +requested again. + +Keeping `persistedGcTime` separate allows: + +- normal in-memory memory pressure behavior +- long-lived offline warm starts +- explicit control over how durable query placeholders are retained + +### Metadata replay and recovery + +Cross-tab replay, targeted invalidation, and `pullSince` recovery currently +transport row keys and values, but not metadata deltas. + +The first implementation should preserve correctness before optimizing for +efficiency: + +- if a committed tx includes metadata changes that cannot be replayed exactly, + persisted runtimes may conservatively fall back to reload behavior +- targeted metadata replay can be added later as a follow-up optimization + +This allows metadata support to ship without requiring a fully optimized replay +protocol on day one. + +### Namespacing convention + +Sync implementations that write collection metadata must namespace their keys. + +The convention is: + +- `:` + +Examples: + +- `queryCollection:gc:` +- `queryCollection:query:` +- `electric:resume` + +This RFC does not require a registry mechanism initially, but namespaced keys +are mandatory to avoid collisions. + +## Rollout Plan + +### Phase 1 + +- add optional metadata API to sync params +- stage metadata writes on pending sync transactions +- support metadata-only committed sync transactions +- add SQLite support for row metadata and collection metadata +- hydrate row metadata alongside persisted rows + +### Phase 2 + +- use row metadata in query collections for durable ownership +- persist query placeholder retention state in collection metadata +- implement restart-safe GC behavior +- use conservative reload fallback for metadata-bearing replay/recovery paths +- support separate persisted retention policy for query collections + +### Phase 3 + +- use collection metadata in Electric for persisted resume state +- evaluate which additional Electric-derived state must become durable for exact + restart behavior + +## Open Questions + +1. Should the initial SQLite implementation store query ownership only inside row + metadata blobs, or also maintain a normalized ownership index from the start? + +2. Should collection metadata be exposed to sync implementations only at startup + and during transactions, or also via a read-only utility surface outside + `sync.sync()`? + +3. Should persisted query GC cleanup run only on startup and local unload paths, + or also as part of a background maintenance task in persisted runtimes? + +4. Should Electric persist only a resume offset, or also a stronger stream + identity payload including shape/handle information to detect incompatible + resume state? + +## Testing Invariants + +Any implementation of this RFC should add tests for at least these invariants: + +- metadata commits iff the corresponding sync transaction commits +- row hydration restores row metadata together with row values +- query collection warm-start does not delete persisted rows before ownership is + reconstructed +- persisted query GC deletes rows only when ownership is truly orphaned +- metadata-only sync transactions persist correctly +- truncate clears row metadata and any collection metadata that is defined as + reset-scoped +- Electric resume metadata advances only when the corresponding batch commits +- metadata-bearing replay and recovery paths remain correct, even when they fall + back to reload behavior + +## Recommendation + +Adopt a transactional metadata API with two scopes: + +- row metadata for per-row durable sync state +- collection metadata for durable collection-wide state + +Implement both in the SQLite persisted collection layer, then migrate: + +- `query-db-collection` to durable row ownership plus collection-level GC state +- `electric-db-collection` to transactional persisted resume metadata + +This keeps the API generic while preserving the key correctness property: + +> metadata that affects persisted row behavior commits together with the row +> state it explains diff --git a/examples/react/offline-transactions/package.json b/examples/react/offline-transactions/package.json index 42d2c4808..4af7dbde9 100644 --- a/examples/react/offline-transactions/package.json +++ b/examples/react/offline-transactions/package.json @@ -9,6 +9,8 @@ "start": "node .output/server/index.mjs" }, "dependencies": { + "@tanstack/db": "workspace:*", + "@tanstack/db-browser-wa-sqlite-persisted-collection": "workspace:*", "@tanstack/offline-transactions": "^1.0.24", "@tanstack/query-db-collection": "^1.0.30", "@tanstack/react-db": "^0.1.77", diff --git a/examples/react/offline-transactions/src/components/PersistedTodoDemo.tsx b/examples/react/offline-transactions/src/components/PersistedTodoDemo.tsx new file mode 100644 index 000000000..e7252f5f4 --- /dev/null +++ b/examples/react/offline-transactions/src/components/PersistedTodoDemo.tsx @@ -0,0 +1,181 @@ +import React, { useState } from 'react' +import { useLiveQuery } from '@tanstack/react-db' +import type { Collection } from '@tanstack/db' +import type { PersistedTodo } from '~/db/persisted-todos' + +interface PersistedTodoDemoProps { + collection: Collection +} + +export function PersistedTodoDemo({ collection }: PersistedTodoDemoProps) { + const [newTodoText, setNewTodoText] = useState(``) + const [error, setError] = useState(null) + + const { data: todoList = [] } = useLiveQuery((q) => + q.from({ todo: collection }).orderBy(({ todo }) => todo.createdAt, `desc`), + ) + + const handleAddTodo = () => { + if (!newTodoText.trim()) return + + try { + setError(null) + const now = new Date().toISOString() + collection.insert({ + id: crypto.randomUUID(), + text: newTodoText.trim(), + completed: false, + createdAt: now, + updatedAt: now, + }) + setNewTodoText(``) + } catch (err) { + setError(err instanceof Error ? err.message : `Failed to add todo`) + } + } + + const handleToggleTodo = (id: string) => { + try { + setError(null) + collection.update(id, (draft) => { + draft.completed = !draft.completed + draft.updatedAt = new Date().toISOString() + }) + } catch (err) { + setError(err instanceof Error ? err.message : `Failed to toggle todo`) + } + } + + const handleDeleteTodo = (id: string) => { + try { + setError(null) + collection.delete(id) + } catch (err) { + setError(err instanceof Error ? err.message : `Failed to delete todo`) + } + } + + const handleKeyPress = (e: React.KeyboardEvent) => { + if (e.key === `Enter`) { + handleAddTodo() + } + } + + return ( +
+
+
+ 🗃️ +
+

+ wa-sqlite OPFS Persistence Demo +

+

+ Collection data is persisted to SQLite via OPFS. Data survives + page reloads without any server sync. +

+
+
+ + {/* Persistence indicator */} +
+
+
+ SQLite OPFS Persistence Active +
+
+ {todoList.length} todo{todoList.length !== 1 ? `s` : ``} +
+
+ + {/* Error display */} + {error && ( +
+

{error}

+
+ )} + + {/* Add new todo */} +
+ setNewTodoText(e.target.value)} + onKeyPress={handleKeyPress} + placeholder="Add a new todo..." + className="flex-1 px-3 py-2 border border-gray-300 rounded-md focus:outline-none focus:ring-2 focus:ring-blue-500" + /> + +
+ + {/* Todo list */} +
+ {todoList.length === 0 ? ( +
+ No todos yet. Add one above to get started! +
+ + Try adding todos, then refresh the page to see them persist + +
+ ) : ( + todoList.map((todo) => ( +
+ + + {todo.text} + + + {new Date(todo.createdAt).toLocaleDateString()} + + +
+ )) + )} +
+ + {/* Instructions */} +
+

Try this:

+
    +
  1. 1. Add some todos
  2. +
  3. 2. Refresh the page (Ctrl+R / Cmd+R)
  4. +
  5. + 3. Your todos are still here - persisted in SQLite via OPFS! +
  6. +
  7. 4. This uses wa-sqlite with OPFSCoopSyncVFS in a Web Worker
  8. +
+
+
+
+ ) +} diff --git a/examples/react/offline-transactions/src/db/persisted-todos.ts b/examples/react/offline-transactions/src/db/persisted-todos.ts new file mode 100644 index 000000000..eae071fe4 --- /dev/null +++ b/examples/react/offline-transactions/src/db/persisted-todos.ts @@ -0,0 +1,53 @@ +import { createCollection } from '@tanstack/react-db' +import { + BrowserCollectionCoordinator, + createBrowserWASQLitePersistence, + openBrowserWASQLiteOPFSDatabase, + persistedCollectionOptions, +} from '@tanstack/db-browser-wa-sqlite-persisted-collection' +import type { Collection } from '@tanstack/db' + +export type PersistedTodo = { + id: string + text: string + completed: boolean + createdAt: string + updatedAt: string +} + +export type PersistedTodosHandle = { + collection: Collection + close: () => Promise +} + +export async function createPersistedTodoCollection(): Promise { + const database = await openBrowserWASQLiteOPFSDatabase({ + databaseName: `tanstack-db-demo-v2.sqlite`, + }) + + const coordinator = new BrowserCollectionCoordinator({ + dbName: `tanstack-db-demo`, + }) + + const persistence = createBrowserWASQLitePersistence({ + database, + coordinator, + }) + + const collection = createCollection( + persistedCollectionOptions({ + id: `persisted-todos`, + getKey: (todo) => todo.id, + persistence, + schemaVersion: 1, + }), + ) + + return { + collection: collection, + close: async () => { + coordinator.dispose() + await database.close?.() + }, + } +} diff --git a/examples/react/offline-transactions/src/db/todos.ts b/examples/react/offline-transactions/src/db/todos.ts index 1d8be0340..a4418a150 100644 --- a/examples/react/offline-transactions/src/db/todos.ts +++ b/examples/react/offline-transactions/src/db/todos.ts @@ -118,12 +118,16 @@ export const todoAPI = { }) { const mutations = transaction.mutations - console.log(`sync todos`, mutations[0].changes, mutations[0].original.text) + console.log( + `sync todos`, + mutations[0].changes, + (mutations[0].original as Record).text, + ) for (const mutation of mutations) { try { switch (mutation.type) { case `insert`: { - const todoData = mutation.modified as Todo + const todoData = mutation.modified as unknown as Todo const response = await fetchWithRetry(`/api/todos`, { method: `POST`, headers: { @@ -145,7 +149,7 @@ export const todoAPI = { case `update`: { const todoData = mutation.modified as Partial const response = await fetch( - `/api/todos/${(mutation.modified as Todo).id}`, + `/api/todos/${(mutation.modified as unknown as Todo).id}`, { method: `PUT`, headers: { @@ -167,7 +171,7 @@ export const todoAPI = { case `delete`: { const response = await fetchWithRetry( - `/api/todos/${(mutation.original as Todo).id}`, + `/api/todos/${(mutation.original as unknown as Todo).id}`, { method: `DELETE`, headers: { diff --git a/examples/react/offline-transactions/src/routeTree.gen.ts b/examples/react/offline-transactions/src/routeTree.gen.ts index d594674fe..c59e9b6ff 100644 --- a/examples/react/offline-transactions/src/routeTree.gen.ts +++ b/examples/react/offline-transactions/src/routeTree.gen.ts @@ -9,10 +9,16 @@ // Additionally, you should also exclude this file from your linter and/or formatter to prevent it from being checked or modified. import { Route as rootRouteImport } from './routes/__root' +import { Route as WaSqliteRouteImport } from './routes/wa-sqlite' import { Route as LocalstorageRouteImport } from './routes/localstorage' import { Route as IndexeddbRouteImport } from './routes/indexeddb' import { Route as IndexRouteImport } from './routes/index' +const WaSqliteRoute = WaSqliteRouteImport.update({ + id: '/wa-sqlite', + path: '/wa-sqlite', + getParentRoute: () => rootRouteImport, +} as any) const LocalstorageRoute = LocalstorageRouteImport.update({ id: '/localstorage', path: '/localstorage', @@ -33,34 +39,45 @@ export interface FileRoutesByFullPath { '/': typeof IndexRoute '/indexeddb': typeof IndexeddbRoute '/localstorage': typeof LocalstorageRoute + '/wa-sqlite': typeof WaSqliteRoute } export interface FileRoutesByTo { '/': typeof IndexRoute '/indexeddb': typeof IndexeddbRoute '/localstorage': typeof LocalstorageRoute + '/wa-sqlite': typeof WaSqliteRoute } export interface FileRoutesById { __root__: typeof rootRouteImport '/': typeof IndexRoute '/indexeddb': typeof IndexeddbRoute '/localstorage': typeof LocalstorageRoute + '/wa-sqlite': typeof WaSqliteRoute } export interface FileRouteTypes { fileRoutesByFullPath: FileRoutesByFullPath - fullPaths: '/' | '/indexeddb' | '/localstorage' + fullPaths: '/' | '/indexeddb' | '/localstorage' | '/wa-sqlite' fileRoutesByTo: FileRoutesByTo - to: '/' | '/indexeddb' | '/localstorage' - id: '__root__' | '/' | '/indexeddb' | '/localstorage' + to: '/' | '/indexeddb' | '/localstorage' | '/wa-sqlite' + id: '__root__' | '/' | '/indexeddb' | '/localstorage' | '/wa-sqlite' fileRoutesById: FileRoutesById } export interface RootRouteChildren { IndexRoute: typeof IndexRoute IndexeddbRoute: typeof IndexeddbRoute LocalstorageRoute: typeof LocalstorageRoute + WaSqliteRoute: typeof WaSqliteRoute } declare module '@tanstack/react-router' { interface FileRoutesByPath { + '/wa-sqlite': { + id: '/wa-sqlite' + path: '/wa-sqlite' + fullPath: '/wa-sqlite' + preLoaderRoute: typeof WaSqliteRouteImport + parentRoute: typeof rootRouteImport + } '/localstorage': { id: '/localstorage' path: '/localstorage' @@ -89,6 +106,7 @@ const rootRouteChildren: RootRouteChildren = { IndexRoute: IndexRoute, IndexeddbRoute: IndexeddbRoute, LocalstorageRoute: LocalstorageRoute, + WaSqliteRoute: WaSqliteRoute, } export const routeTree = rootRouteImport ._addFileChildren(rootRouteChildren) diff --git a/examples/react/offline-transactions/src/router.tsx b/examples/react/offline-transactions/src/router.tsx index d5579a802..e9f12d870 100644 --- a/examples/react/offline-transactions/src/router.tsx +++ b/examples/react/offline-transactions/src/router.tsx @@ -15,6 +15,10 @@ export function createRouter() { return router } +export async function getRouter() { + return createRouter() +} + declare module '@tanstack/react-router' { interface Register { router: ReturnType diff --git a/examples/react/offline-transactions/src/routes/__root.tsx b/examples/react/offline-transactions/src/routes/__root.tsx index 89588bacb..fb72e0806 100644 --- a/examples/react/offline-transactions/src/routes/__root.tsx +++ b/examples/react/offline-transactions/src/routes/__root.tsx @@ -109,6 +109,15 @@ function RootDocument({ children }: { children: React.ReactNode }) { > 💾 localStorage + + 🗃️ wa-sqlite +
diff --git a/examples/react/offline-transactions/src/routes/index.tsx b/examples/react/offline-transactions/src/routes/index.tsx index 6e9141e5d..bb98d7561 100644 --- a/examples/react/offline-transactions/src/routes/index.tsx +++ b/examples/react/offline-transactions/src/routes/index.tsx @@ -73,6 +73,43 @@ function Home() { +
+ +
+
+ 🗃️ +
+

+ wa-sqlite OPFS Persistence +

+ + NEW + +
+
+

+ Collection-level persistence using wa-sqlite with OPFS. Data is + stored in a real SQLite database in the browser via a Web + Worker. Survives page reloads without server sync. +

+
+ + SQLite in Browser + + + OPFS Storage + + + Web Worker + + + Local-only + +
+
+ +
+

Features Demonstrated diff --git a/examples/react/offline-transactions/src/routes/wa-sqlite.tsx b/examples/react/offline-transactions/src/routes/wa-sqlite.tsx new file mode 100644 index 000000000..a6a63de38 --- /dev/null +++ b/examples/react/offline-transactions/src/routes/wa-sqlite.tsx @@ -0,0 +1,95 @@ +import { createFileRoute } from '@tanstack/react-router' +import { useEffect, useState } from 'react' +import type { PersistedTodosHandle } from '~/db/persisted-todos' +import { PersistedTodoDemo } from '~/components/PersistedTodoDemo' +import { createPersistedTodoCollection } from '~/db/persisted-todos' + +export const Route = createFileRoute(`/wa-sqlite`)({ + component: WASQLiteDemo, +}) + +function WASQLiteDemo() { + const [handle, setHandle] = useState(null) + const [error, setError] = useState(null) + + useEffect(() => { + let disposed = false + let currentHandle: PersistedTodosHandle | null = null + + createPersistedTodoCollection() + .then((h) => { + if (disposed) { + h.close() + return + } + currentHandle = h + setHandle(h) + }) + .catch((err) => { + if (!disposed) { + console.error(`Failed to initialize wa-sqlite persistence:`, err) + setError( + err instanceof Error + ? err.message + : `Failed to initialize persistence`, + ) + } + }) + + return () => { + disposed = true + currentHandle?.close() + } + }, []) + + if (error) { + return ( +
+
+
+
+ ⚠️ +
+

+ Persistence Unavailable +

+

+ wa-sqlite OPFS persistence could not be initialized. +

+
+
+
+

{error}

+
+

+ This feature requires a browser with OPFS support (Chrome 102+, + Edge 102+, Firefox 111+, Safari 15.2+) and a secure context (HTTPS + or localhost). +

+
+
+
+ ) + } + + if (!handle) { + return ( +
+
+
+
+

+ Initializing wa-sqlite persistence... +

+
+
+
+ ) + } + + return ( +
+ +
+ ) +} diff --git a/examples/react/offline-transactions/vite.config.ts b/examples/react/offline-transactions/vite.config.ts index 402e2d611..ad1dcf0fe 100644 --- a/examples/react/offline-transactions/vite.config.ts +++ b/examples/react/offline-transactions/vite.config.ts @@ -1,3 +1,4 @@ +import fs from 'node:fs' import path from 'node:path' import { tanstackStart } from '@tanstack/react-start/plugin/vite' import { defineConfig } from 'vite' @@ -13,6 +14,14 @@ function watchWorkspacePackages() { const watchPaths = [ path.resolve(__dirname, `../../../packages/db/dist`), path.resolve(__dirname, `../../../packages/offline-transactions/dist`), + path.resolve( + __dirname, + `../../../packages/db-browser-wa-sqlite-persisted-collection/src`, + ), + path.resolve( + __dirname, + `../../../packages/db-sqlite-persisted-collection-core/dist`, + ), ] console.log(`[watch-workspace] Starting to watch paths:`) @@ -21,20 +30,22 @@ function watchWorkspacePackages() { console.log(`[watch-workspace] Resolved paths:`) watchPaths.forEach((p) => console.log(` - ${path.resolve(p)}`)) + let ready = false + const watcher = chokidar.watch(watchPaths, { ignored: /node_modules/, persistent: true, }) watcher.on(`ready`, () => { + ready = true console.log( `[watch-workspace] Initial scan complete. Watching for changes...`, ) - const watchedPaths = watcher.getWatched() - console.log(`[watch-workspace] Currently watching:`, watchedPaths) }) watcher.on(`add`, (filePath) => { + if (!ready) return console.log(`[watch-workspace] File added: ${filePath}`) server.ws.send({ type: `full-reload`, @@ -42,6 +53,7 @@ function watchWorkspacePackages() { }) watcher.on(`change`, (filePath) => { + if (!ready) return console.log(`[watch-workspace] File changed: ${filePath}`) server.ws.send({ type: `full-reload`, @@ -62,10 +74,70 @@ export default defineConfig({ ignored: [`!**/node_modules/@tanstack/**`], }, }, + resolve: { + alias: { + // Resolve to source so Vite can process the ?worker import natively + '@tanstack/db-browser-wa-sqlite-persisted-collection': path.resolve( + __dirname, + `../../../packages/db-browser-wa-sqlite-persisted-collection/src/index.ts`, + ), + // Required because the browser package's source re-exports from core + '@tanstack/db-sqlite-persisted-collection-core': path.resolve( + __dirname, + `../../../packages/db-sqlite-persisted-collection-core/src/index.ts`, + ), + }, + }, optimizeDeps: { - exclude: [`@tanstack/db`, `@tanstack/offline-transactions`], + exclude: [ + `@tanstack/db`, + `@tanstack/offline-transactions`, + `@tanstack/db-browser-wa-sqlite-persisted-collection`, + `@tanstack/db-sqlite-persisted-collection-core`, + `@journeyapps/wa-sqlite`, + ], }, plugins: [ + // Serve .wasm files before TanStack Start's catch-all handler intercepts them. + // We use configureServer returning a function (post-hook) and unshift onto the + // stack so this runs before any other middleware including TanStack Start. + { + name: `serve-wasm-files`, + configureServer(server: any) { + const wasmHandler = (req: any, res: any, next: () => void) => { + // Strip query string before checking extension + const urlWithoutQuery = (req.url ?? ``).split(`?`)[0] + if (!urlWithoutQuery.endsWith(`.wasm`)) { + return next() + } + + // Handle /@fs/ paths used by Vite for serving node_modules files + const fsPrefix = `/@fs` + let filePath: string | undefined + if (urlWithoutQuery.startsWith(fsPrefix)) { + filePath = urlWithoutQuery.slice(fsPrefix.length) + } + + if (!filePath || !fs.existsSync(filePath)) { + return next() + } + + const content = fs.readFileSync(filePath) + res.writeHead(200, { + 'Content-Type': `application/wasm`, + 'Content-Length': content.byteLength, + 'Cache-Control': `no-cache`, + }) + res.end(content) + } + + // Prepend to the middleware stack so it runs before TanStack Start + server.middlewares.stack.unshift({ + route: ``, + handle: wasmHandler, + }) + }, + }, watchWorkspacePackages(), tsConfigPaths({ projects: [`./tsconfig.json`], diff --git a/packages/db-sqlite-persisted-collection-core/src/persisted.ts b/packages/db-sqlite-persisted-collection-core/src/persisted.ts index 1691b19cd..2cdcbec45 100644 --- a/packages/db-sqlite-persisted-collection-core/src/persisted.ts +++ b/packages/db-sqlite-persisted-collection-core/src/persisted.ts @@ -19,6 +19,7 @@ import type { PendingMutation, SyncConfig, SyncConfigRes, + SyncMetadataApi, UpdateMutationFnParams, UtilsRecord, } from '@tanstack/db' @@ -179,10 +180,29 @@ export type PersistedTx< seq: number rowVersion: number mutations: Array< - | { type: `insert`; key: TKey; value: T } - | { type: `update`; key: TKey; value: T } + | { + type: `insert` + key: TKey + value: T + metadata?: unknown + metadataChanged?: boolean + } + | { + type: `update` + key: TKey + value: T + metadata?: unknown + metadataChanged?: boolean + } | { type: `delete`; key: TKey; value: T } > + rowMetadataMutations?: Array< + { type: `set`; key: TKey; value: unknown } | { type: `delete`; key: TKey } + > + collectionMetadataMutations?: Array< + | { type: `set`; key: string; value: unknown } + | { type: `delete`; key: string } + > } export interface PersistenceAdapter< @@ -193,11 +213,14 @@ export interface PersistenceAdapter< collectionId: string, options: LoadSubsetOptions, ctx?: { requiredIndexSignatures?: ReadonlyArray }, - ) => Promise> + ) => Promise> applyCommittedTx: ( collectionId: string, tx: PersistedTx, ) => Promise + loadCollectionMetadata?: ( + collectionId: string, + ) => Promise> ensureIndex: ( collectionId: string, signature: string, @@ -366,13 +389,14 @@ type SyncControlFns = { write: | (( message: - | { type: `insert`; value: T } - | { type: `update`; value: T } + | { type: `insert`; value: T; metadata?: Record } + | { type: `update`; value: T; metadata?: Record } | { type: `delete`; key: TKey }, ) => void) | null commit: (() => void) | null truncate: (() => void) | null + metadata: SyncMetadataApi | null } /** @@ -511,6 +535,7 @@ type NormalizedSyncOperation = type: `update` key: TKey value: T + metadata?: Record } | { type: `delete` @@ -520,6 +545,14 @@ type NormalizedSyncOperation = type BufferedSyncTransaction = { operations: Array> + rowMetadataWrites: Map< + TKey, + { type: `set`; value: unknown } | { type: `delete` } + > + collectionMetadataWrites: Map< + string, + { type: `set`; value: unknown } | { type: `delete` } + > truncate: boolean internal: boolean } @@ -536,6 +569,7 @@ type SyncWriteNormalization = { | { type: `update` value: T + metadata?: Record } | { type: `delete` @@ -735,6 +769,7 @@ class PersistedCollectionRuntime< write: null, commit: null, truncate: null, + metadata: null, } private started = false private startPromise: Promise | null = null @@ -771,6 +806,7 @@ class PersistedCollectionRuntime< write: null, commit: null, truncate: null, + metadata: null, } } @@ -829,6 +865,8 @@ class PersistedCollectionRuntime< ) } + await this.loadCollectionMetadataIntoCollection() + const indexBootstrapSnapshot = this.collection?.getIndexMetadata() ?? [] this.attachIndexLifecycleListeners() await this.bootstrapPersistedIndexes(indexBootstrapSnapshot) @@ -841,6 +879,32 @@ class PersistedCollectionRuntime< } } + private async loadCollectionMetadataIntoCollection(): Promise { + if ( + !this.persistence.adapter.loadCollectionMetadata || + !this.syncControls.begin || + !this.syncControls.commit || + !this.syncControls.metadata + ) { + return + } + + const collectionMetadata = + await this.persistence.adapter.loadCollectionMetadata(this.collectionId) + + if (collectionMetadata.length === 0) { + return + } + + this.withInternalApply(() => { + this.syncControls.begin?.({ immediate: true }) + collectionMetadata.forEach(({ key, value }) => { + this.syncControls.metadata?.collection.set(key, value) + }) + this.syncControls.commit?.() + }) + } + async loadSubset( options: LoadSubsetOptions, upstreamLoadSubset?: (options: LoadSubsetOptions) => true | Promise, @@ -951,11 +1015,13 @@ class PersistedCollectionRuntime< forwardMessage: { type: `update`, value: message.value, + metadata: message.metadata, }, operation: { type: `update`, key, value: message.value, + metadata: message.metadata, }, } } @@ -1042,7 +1108,7 @@ class PersistedCollectionRuntime< private loadSubsetRowsUnsafe( options: LoadSubsetOptions, - ): Promise> { + ): Promise> { return this.persistence.adapter.loadSubset(this.collectionId, options, { requiredIndexSignatures: this.getRequiredIndexSignatures(), }) @@ -1071,7 +1137,9 @@ class PersistedCollectionRuntime< } } - private applyRowsToCollection(rows: Array<{ key: TKey; value: T }>): void { + private applyRowsToCollection( + rows: Array<{ key: TKey; value: T; metadata?: unknown }>, + ): void { if ( !this.syncControls.begin || !this.syncControls.write || @@ -1087,6 +1155,7 @@ class PersistedCollectionRuntime< this.syncControls.write?.({ type: `update`, value: row.value, + metadata: row.metadata as Record | undefined, }) } @@ -1094,7 +1163,9 @@ class PersistedCollectionRuntime< }) } - private replaceCollectionRows(rows: Array<{ key: TKey; value: T }>): void { + private replaceCollectionRows( + rows: Array<{ key: TKey; value: T; metadata?: unknown }>, + ): void { if ( !this.syncControls.begin || !this.syncControls.write || @@ -1111,6 +1182,7 @@ class PersistedCollectionRuntime< this.syncControls.write?.({ type: `update`, value: row.value, + metadata: row.metadata as Record | undefined, }) } @@ -1156,10 +1228,27 @@ class PersistedCollectionRuntime< this.syncControls.write?.({ type: `update`, value: operation.value, + metadata: operation.metadata, }) } } + for (const [key, metadataWrite] of transaction.rowMetadataWrites) { + if (metadataWrite.type === `delete`) { + this.syncControls.metadata?.row.delete(key) + } else { + this.syncControls.metadata?.row.set(key, metadataWrite.value) + } + } + + for (const [key, metadataWrite] of transaction.collectionMetadataWrites) { + if (metadataWrite.type === `delete`) { + this.syncControls.metadata?.collection.delete(key) + } else { + this.syncControls.metadata?.collection.set(key, metadataWrite.value) + } + } + this.syncControls.commit?.() } @@ -1181,7 +1270,12 @@ class PersistedCollectionRuntime< const streamPosition = this.nextLocalStreamPosition() - if (transaction.truncate || transaction.operations.length === 0) { + if ( + transaction.truncate || + (transaction.operations.length === 0 && + transaction.rowMetadataWrites.size === 0 && + transaction.collectionMetadataWrites.size === 0) + ) { this.publishTxCommittedEvent( this.createTxCommittedPayload({ term: streamPosition.term, @@ -1196,10 +1290,7 @@ class PersistedCollectionRuntime< return } - const tx = this.createPersistedTxFromOperations( - transaction.operations, - streamPosition, - ) + const tx = this.createPersistedTxFromOperations(transaction, streamPosition) await this.persistence.adapter.applyCommittedTx(this.collectionId, tx) this.publishTxCommittedEvent( @@ -1208,6 +1299,9 @@ class PersistedCollectionRuntime< seq: tx.seq, txId: tx.txId, latestRowVersion: tx.rowVersion, + hasMetadataChanges: + transaction.rowMetadataWrites.size > 0 || + transaction.collectionMetadataWrites.size > 0, changedRows: transaction.operations .filter((operation) => operation.type === `update`) .map((operation) => ({ key: operation.key, value: operation.value })), @@ -1219,7 +1313,7 @@ class PersistedCollectionRuntime< } private createPersistedTxFromOperations( - operations: Array>, + transaction: BufferedSyncTransaction, streamPosition: { term: number; seq: number; rowVersion: number }, ): PersistedTx { return { @@ -1227,12 +1321,14 @@ class PersistedCollectionRuntime< term: streamPosition.term, seq: streamPosition.seq, rowVersion: streamPosition.rowVersion, - mutations: operations.map((operation) => + mutations: transaction.operations.map((operation) => operation.type === `update` ? { type: `update`, key: operation.key, value: operation.value, + metadata: operation.metadata, + metadataChanged: operation.metadata !== undefined, } : { type: `delete`, @@ -1240,6 +1336,20 @@ class PersistedCollectionRuntime< value: operation.value, }, ), + rowMetadataMutations: Array.from( + transaction.rowMetadataWrites.entries(), + ).map(([key, metadataWrite]) => + metadataWrite.type === `delete` + ? { type: `delete`, key } + : { type: `set`, key, value: metadataWrite.value }, + ), + collectionMetadataMutations: Array.from( + transaction.collectionMetadataWrites.entries(), + ).map(([key, metadataWrite]) => + metadataWrite.type === `delete` + ? { type: `delete`, key } + : { type: `set`, key, value: metadataWrite.value }, + ), } } @@ -1387,6 +1497,11 @@ class PersistedCollectionRuntime< seq: tx.seq, txId: tx.txId, latestRowVersion: tx.rowVersion, + hasMetadataChanges: + (tx.rowMetadataMutations !== undefined && + tx.rowMetadataMutations.length > 0) || + (tx.collectionMetadataMutations !== undefined && + tx.collectionMetadataMutations.length > 0), changedRows: mutations .filter((mutation) => mutation.type !== `delete`) .map((mutation) => ({ @@ -1409,10 +1524,12 @@ class PersistedCollectionRuntime< latestRowVersion: number changedRows: Array<{ key: TKey; value: T }> deletedKeys: Array + hasMetadataChanges?: boolean requiresFullReload?: boolean }): TxCommitted { const requiresFullReload = args.requiresFullReload === true || + args.hasMetadataChanges === true || args.changedRows.length + args.deletedKeys.length > TARGETED_INVALIDATION_KEY_LIMIT @@ -1925,6 +2042,7 @@ function createWrappedSyncConfig< write: params.write as SyncControlFns[`write`], commit: params.commit, truncate: params.truncate, + metadata: params.metadata ?? null, }) runtime.setCollection( params.collection as Collection, @@ -1949,6 +2067,8 @@ function createWrappedSyncConfig< begin: (options?: { immediate?: boolean }) => { const transaction: OpenSyncTransaction = { operations: [], + rowMetadataWrites: new Map(), + collectionMetadataWrites: new Map(), truncate: false, internal: runtime.isApplyingInternally(), queuedBecauseHydrating: @@ -1970,10 +2090,66 @@ function createWrappedSyncConfig< } openTransaction.operations.push(normalization.operation) + if (normalization.operation.type === `delete`) { + openTransaction.rowMetadataWrites.set(normalization.operation.key, { + type: `delete`, + }) + } else if (normalization.operation.metadata !== undefined) { + openTransaction.rowMetadataWrites.set(normalization.operation.key, { + type: `set`, + value: normalization.operation.metadata, + }) + } if (!openTransaction.queuedBecauseHydrating) { params.write(normalization.forwardMessage) } }, + metadata: params.metadata + ? { + row: { + get: (key: TKey) => params.metadata!.row.get(key), + set: (key: TKey, value: unknown) => { + const openTransaction = + transactionStack[transactionStack.length - 1] + openTransaction?.rowMetadataWrites.set(key, { + type: `set`, + value, + }) + params.metadata!.row.set(key, value) + }, + delete: (key: TKey) => { + const openTransaction = + transactionStack[transactionStack.length - 1] + openTransaction?.rowMetadataWrites.set(key, { + type: `delete`, + }) + params.metadata!.row.delete(key) + }, + }, + collection: { + get: (key: string) => params.metadata!.collection.get(key), + set: (key: string, value: unknown) => { + const openTransaction = + transactionStack[transactionStack.length - 1] + openTransaction?.collectionMetadataWrites.set(key, { + type: `set`, + value, + }) + params.metadata!.collection.set(key, value) + }, + delete: (key: string) => { + const openTransaction = + transactionStack[transactionStack.length - 1] + openTransaction?.collectionMetadataWrites.set(key, { + type: `delete`, + }) + params.metadata!.collection.delete(key) + }, + list: (prefix?: string) => + params.metadata!.collection.list(prefix), + }, + } + : undefined, truncate: () => { const openTransaction = transactionStack[transactionStack.length - 1] if (!openTransaction) { @@ -1996,6 +2172,9 @@ function createWrappedSyncConfig< if (openTransaction.queuedBecauseHydrating) { runtime.queueHydrationBufferedTransaction({ operations: openTransaction.operations, + rowMetadataWrites: openTransaction.rowMetadataWrites, + collectionMetadataWrites: + openTransaction.collectionMetadataWrites, truncate: openTransaction.truncate, internal: openTransaction.internal, }) @@ -2007,6 +2186,9 @@ function createWrappedSyncConfig< void runtime .persistAndBroadcastExternalSyncTransaction({ operations: openTransaction.operations, + rowMetadataWrites: openTransaction.rowMetadataWrites, + collectionMetadataWrites: + openTransaction.collectionMetadataWrites, truncate: openTransaction.truncate, internal: false, }) @@ -2051,6 +2233,7 @@ function createLoopbackSyncConfig< write: params.write as SyncControlFns[`write`], commit: params.commit, truncate: params.truncate, + metadata: params.metadata ?? null, }) runtime.setCollection( params.collection as Collection, diff --git a/packages/db-sqlite-persisted-collection-core/src/sqlite-core-adapter.ts b/packages/db-sqlite-persisted-collection-core/src/sqlite-core-adapter.ts index 9f1662de1..b482d96f6 100644 --- a/packages/db-sqlite-persisted-collection-core/src/sqlite-core-adapter.ts +++ b/packages/db-sqlite-persisted-collection-core/src/sqlite-core-adapter.ts @@ -37,6 +37,7 @@ type CompiledSqlFragment = { type StoredSqliteRow = { key: string value: string + metadata: string | null row_version: number } @@ -588,6 +589,7 @@ function sanitizeExpressionSqlFragment(fragment: string): string { type InMemoryRow = { key: TKey value: T + metadata?: unknown rowVersion: number } @@ -1035,7 +1037,7 @@ export class SQLiteCorePersistenceAdapter< collectionId: string, options: LoadSubsetOptions, ctx?: { requiredIndexSignatures?: ReadonlyArray }, - ): Promise> { + ): Promise> { const tableMapping = await this.ensureCollectionReady(collectionId) await this.touchRequiredIndexes(collectionId, ctx?.requiredIndexSignatures) @@ -1072,6 +1074,7 @@ export class SQLiteCorePersistenceAdapter< return orderedRows.map((row) => ({ key: row.key, value: row.value, + metadata: row.metadata, })) } @@ -1079,6 +1082,7 @@ export class SQLiteCorePersistenceAdapter< return rows.map((row) => ({ key: row.key, value: row.value, + metadata: row.metadata, })) } @@ -1140,8 +1144,11 @@ export class SQLiteCorePersistenceAdapter< continue } - const existingRows = await transactionDriver.query<{ value: string }>( - `SELECT value + const existingRows = await transactionDriver.query<{ + value: string + metadata: string | null + }>( + `SELECT value, metadata FROM ${collectionTableSql} WHERE key = ? LIMIT 1`, @@ -1150,18 +1157,34 @@ export class SQLiteCorePersistenceAdapter< const existingValue = existingRows[0]?.value ? deserializePersistedRowValue(existingRows[0].value) : undefined + const existingMetadata = + existingRows[0]?.metadata != null + ? deserializePersistedRowValue(existingRows[0].metadata) + : undefined const mergedValue = mutation.type === `update` ? mergeObjectRows(existingValue, mutation.value) : mutation.value + const nextMetadata = + mutation.metadataChanged === true + ? mutation.metadata + : existingMetadata await transactionDriver.run( - `INSERT INTO ${collectionTableSql} (key, value, row_version) - VALUES (?, ?, ?) + `INSERT INTO ${collectionTableSql} (key, value, metadata, row_version) + VALUES (?, ?, ?, ?) ON CONFLICT(key) DO UPDATE SET value = excluded.value, + metadata = excluded.metadata, row_version = excluded.row_version`, - [encodedKey, serializePersistedRowValue(mergedValue), nextRowVersion], + [ + encodedKey, + serializePersistedRowValue(mergedValue), + nextMetadata === undefined + ? null + : serializePersistedRowValue(nextMetadata), + nextRowVersion, + ], ) await transactionDriver.run( `DELETE FROM ${tombstoneTableSql} @@ -1170,6 +1193,53 @@ export class SQLiteCorePersistenceAdapter< ) } + for (const rowMetadataMutation of tx.rowMetadataMutations ?? []) { + const encodedKey = encodePersistedStorageKey(rowMetadataMutation.key) + if (rowMetadataMutation.type === `delete`) { + await transactionDriver.run( + `UPDATE ${collectionTableSql} + SET metadata = NULL + WHERE key = ?`, + [encodedKey], + ) + } else { + await transactionDriver.run( + `UPDATE ${collectionTableSql} + SET metadata = ? + WHERE key = ?`, + [ + rowMetadataMutation.value === undefined + ? null + : serializePersistedRowValue(rowMetadataMutation.value), + encodedKey, + ], + ) + } + } + + for (const metadataMutation of tx.collectionMetadataMutations ?? []) { + if (metadataMutation.type === `delete`) { + await transactionDriver.run( + `DELETE FROM collection_metadata + WHERE collection_id = ? AND key = ?`, + [collectionId, metadataMutation.key], + ) + } else { + await transactionDriver.run( + `INSERT INTO collection_metadata (collection_id, key, value, updated_at) + VALUES (?, ?, ?, CAST(strftime('%s', 'now') AS INTEGER)) + ON CONFLICT(collection_id, key) DO UPDATE SET + value = excluded.value, + updated_at = excluded.updated_at`, + [ + collectionId, + metadataMutation.key, + serializePersistedRowValue(metadataMutation.value), + ], + ) + } + } + await transactionDriver.run( `INSERT INTO collection_version (collection_id, latest_row_version) VALUES (?, ?) @@ -1207,6 +1277,22 @@ export class SQLiteCorePersistenceAdapter< }) } + async loadCollectionMetadata( + collectionId: string, + ): Promise> { + const rows = await this.driver.query<{ key: string; value: string }>( + `SELECT key, value + FROM collection_metadata + WHERE collection_id = ?`, + [collectionId], + ) + + return rows.map((row) => ({ + key: row.key, + value: deserializePersistedRowValue(row.value), + })) + } + async ensureIndex( collectionId: string, signature: string, @@ -1405,7 +1491,7 @@ export class SQLiteCorePersistenceAdapter< const orderByCompiled = compileOrderByClauses(options.orderBy) const queryParams: Array = [] - let sql = `SELECT key, value, row_version FROM ${collectionTableSql}` + let sql = `SELECT key, value, metadata, row_version FROM ${collectionTableSql}` if (options.where && whereCompiled.supported) { sql = `${sql} WHERE ${whereCompiled.sql}` @@ -1427,6 +1513,10 @@ export class SQLiteCorePersistenceAdapter< return { key, value, + metadata: + row.metadata != null + ? deserializePersistedRowValue(row.metadata) + : undefined, rowVersion: row.row_version, } }) @@ -1646,6 +1736,7 @@ export class SQLiteCorePersistenceAdapter< `CREATE TABLE IF NOT EXISTS ${collectionTableSql} ( key TEXT PRIMARY KEY, value TEXT NOT NULL, + metadata TEXT, row_version INTEGER NOT NULL )`, ) @@ -1800,6 +1891,15 @@ export class SQLiteCorePersistenceAdapter< latest_row_version INTEGER NOT NULL )`, ) + await this.driver.exec( + `CREATE TABLE IF NOT EXISTS collection_metadata ( + collection_id TEXT NOT NULL, + key TEXT NOT NULL, + value TEXT NOT NULL, + updated_at INTEGER NOT NULL, + PRIMARY KEY (collection_id, key) + )`, + ) await this.driver.exec( `CREATE TABLE IF NOT EXISTS leader_term ( collection_id TEXT PRIMARY KEY, diff --git a/packages/db-sqlite-persisted-collection-core/tests/persisted.test.ts b/packages/db-sqlite-persisted-collection-core/tests/persisted.test.ts index 6ac202f13..9ce212fc7 100644 --- a/packages/db-sqlite-persisted-collection-core/tests/persisted.test.ts +++ b/packages/db-sqlite-persisted-collection-core/tests/persisted.test.ts @@ -44,7 +44,9 @@ type RecordingAdapter = PersistenceAdapter & { options: LoadSubsetOptions requiredIndexSignatures: ReadonlyArray }> + loadCollectionMetadataCalls: Array rows: Map + collectionMetadata: Map } function createRecordingAdapter( @@ -54,10 +56,12 @@ function createRecordingAdapter( const adapter: RecordingAdapter = { rows, + collectionMetadata: new Map(), applyCommittedTxCalls: [], ensureIndexCalls: [], markIndexRemovedCalls: [], loadSubsetCalls: [], + loadCollectionMetadataCalls: [], loadSubset: (collectionId, options, ctx) => { adapter.loadSubsetCalls.push({ collectionId, @@ -71,6 +75,17 @@ function createRecordingAdapter( })), ) }, + loadCollectionMetadata: (collectionId) => { + adapter.loadCollectionMetadataCalls.push(collectionId) + return Promise.resolve( + Array.from(adapter.collectionMetadata.entries()).map( + ([key, value]) => ({ + key, + value, + }), + ), + ) + }, applyCommittedTx: (collectionId, tx) => { adapter.applyCommittedTxCalls.push({ collectionId, @@ -92,6 +107,16 @@ function createRecordingAdapter( rows.set(mutation.key, mutation.value) } } + for (const metadataMutation of tx.collectionMetadataMutations ?? []) { + if (metadataMutation.type === `delete`) { + adapter.collectionMetadata.delete(metadataMutation.key) + } else { + adapter.collectionMetadata.set( + metadataMutation.key, + metadataMutation.value, + ) + } + } return Promise.resolve() }, ensureIndex: (collectionId, signature) => { @@ -257,6 +282,42 @@ describe(`persistedCollectionOptions`, () => { expect(adapter.applyCommittedTxCalls).toHaveLength(1) }) + it(`loads collection metadata into collection state during startup`, async () => { + const adapter = createRecordingAdapter() + adapter.collectionMetadata.set(`electric:resume`, { + kind: `resume`, + offset: `10_0`, + handle: `handle-1`, + shapeId: `shape-1`, + updatedAt: 1, + }) + + const collection = createCollection( + persistedCollectionOptions({ + id: `persisted-startup-metadata`, + getKey: (item) => item.id, + persistence: { + adapter, + }, + }), + ) + + await collection.stateWhenReady() + + expect(adapter.loadCollectionMetadataCalls).toEqual([ + `persisted-startup-metadata`, + ]) + expect( + collection._state.syncedCollectionMetadata.get(`electric:resume`), + ).toEqual({ + kind: `resume`, + offset: `10_0`, + handle: `handle-1`, + shapeId: `shape-1`, + updatedAt: 1, + }) + }) + it(`throws InvalidSyncConfigError when sync key is present but null`, () => { const invalidOptions = { id: `invalid-sync-null`, diff --git a/packages/db-sqlite-persisted-collection-core/tests/sqlite-core-adapter.test.ts b/packages/db-sqlite-persisted-collection-core/tests/sqlite-core-adapter.test.ts index 7c45e964b..6af6dff2c 100644 --- a/packages/db-sqlite-persisted-collection-core/tests/sqlite-core-adapter.test.ts +++ b/packages/db-sqlite-persisted-collection-core/tests/sqlite-core-adapter.test.ts @@ -423,6 +423,127 @@ export function runSQLiteCoreAdapterContractSuite( expect(txRows[0]?.count).toBe(0) }) + it(`persists row metadata and collection metadata atomically`, async () => { + const { adapter, driver } = registerContractHarness() + const collectionId = `metadata-roundtrip` + + await adapter.applyCommittedTx(collectionId, { + txId: `metadata-1`, + term: 1, + seq: 1, + rowVersion: 1, + mutations: [ + { + type: `insert`, + key: `1`, + value: { + id: `1`, + title: `Tracked`, + createdAt: `2026-01-01T00:00:00.000Z`, + score: 1, + }, + metadata: { + queryCollection: { + owners: { + q1: true, + }, + }, + }, + metadataChanged: true, + }, + ], + collectionMetadataMutations: [ + { + type: `set`, + key: `electric:resume`, + value: { + kind: `resume`, + offset: `10_0`, + handle: `handle-1`, + shapeId: `shape-1`, + updatedAt: 1, + }, + }, + ], + }) + + const rows = await adapter.loadSubset(collectionId, {}) + expect(rows).toEqual([ + { + key: `1`, + value: { + id: `1`, + title: `Tracked`, + createdAt: `2026-01-01T00:00:00.000Z`, + score: 1, + }, + metadata: { + queryCollection: { + owners: { + q1: true, + }, + }, + }, + }, + ]) + + const collectionMetadata = + await adapter.loadCollectionMetadata?.(collectionId) + expect(collectionMetadata).toEqual([ + { + key: `electric:resume`, + value: { + kind: `resume`, + offset: `10_0`, + handle: `handle-1`, + shapeId: `shape-1`, + updatedAt: 1, + }, + }, + ]) + + await expect( + adapter.applyCommittedTx(collectionId, { + txId: `metadata-2`, + term: 1, + seq: 2, + rowVersion: 2, + mutations: [ + { + type: `insert`, + key: `2`, + value: { + id: `2`, + title: `Bad`, + createdAt: `2026-01-01T00:00:00.000Z`, + score: 2, + }, + }, + ], + collectionMetadataMutations: [ + { + type: `set`, + key: `broken`, + value: { + invalid: new Date(Number.NaN), + }, + }, + ], + }), + ).rejects.toThrow() + + const rowsAfterFailure = await adapter.loadSubset(collectionId, {}) + expect(rowsAfterFailure).toEqual(rows) + + const metadataRows = await driver.query<{ key: string }>( + `SELECT key + FROM collection_metadata + WHERE collection_id = ?`, + [collectionId], + ) + expect(metadataRows).toEqual([{ key: `electric:resume` }]) + }) + it(`supports pushdown operators with correctness-preserving fallback`, async () => { const { adapter } = registerContractHarness() const collectionId = `todos` diff --git a/packages/db/src/collection/state.ts b/packages/db/src/collection/state.ts index cc435f0d9..af65cb801 100644 --- a/packages/db/src/collection/state.ts +++ b/packages/db/src/collection/state.ts @@ -28,6 +28,8 @@ interface PendingSyncedTransaction< operations: Array> truncate?: boolean deletedKeys: Set + rowMetadataWrites: Map + collectionMetadataWrites: Map optimisticSnapshot?: { upserts: Map deletes: Set @@ -40,6 +42,8 @@ interface PendingSyncedTransaction< immediate?: boolean } +type PendingMetadataWrite = { type: `set`; value: unknown } | { type: `delete` } + type InternalChangeMessage< T extends object = Record, TKey extends string | number = string | number, @@ -70,6 +74,7 @@ export class CollectionStateManager< > = [] public syncedData: SortedMap public syncedMetadata = new Map() + public syncedCollectionMetadata = new Map() // Optimistic state tracking - make public for testing public optimisticUpserts = new Map() @@ -870,6 +875,9 @@ export class CollectionStateManager< for (const operation of transaction.operations) { changedKeys.add(operation.key as TKey) } + for (const [key] of transaction.rowMetadataWrites) { + changedKeys.add(key) + } } // Use pre-captured state if available (from optimistic scenarios), @@ -959,26 +967,6 @@ export class CollectionStateManager< const key = operation.key as TKey this.syncedKeys.add(key) - // Update metadata - switch (operation.type) { - case `insert`: - this.syncedMetadata.set(key, operation.metadata) - break - case `update`: - this.syncedMetadata.set( - key, - Object.assign( - {}, - this.syncedMetadata.get(key), - operation.metadata, - ), - ) - break - case `delete`: - this.syncedMetadata.delete(key) - break - } - // Determine origin: 'local' for local-only collections or pending local changes const origin: VirtualOrigin = this.isLocalOnly || @@ -1025,6 +1013,7 @@ export class CollectionStateManager< } case `delete`: this.syncedData.delete(key) + this.syncedMetadata.delete(key) // Clean up origin and pending tracking for deleted rows this.rowOrigins.delete(key) this.pendingLocalChanges.delete(key) @@ -1036,6 +1025,25 @@ export class CollectionStateManager< break } } + + for (const [key, metadataWrite] of transaction.rowMetadataWrites) { + if (metadataWrite.type === `delete`) { + this.syncedMetadata.delete(key) + continue + } + this.syncedMetadata.set(key, metadataWrite.value) + } + + for (const [ + key, + metadataWrite, + ] of transaction.collectionMetadataWrites) { + if (metadataWrite.type === `delete`) { + this.syncedCollectionMetadata.delete(key) + continue + } + this.syncedCollectionMetadata.set(key, metadataWrite.value) + } } // After applying synced operations, if this commit included a truncate, @@ -1365,6 +1373,7 @@ export class CollectionStateManager< public cleanup(): void { this.syncedData.clear() this.syncedMetadata.clear() + this.syncedCollectionMetadata.clear() this.optimisticUpserts.clear() this.optimisticDeletes.clear() this.pendingOptimisticUpserts.clear() diff --git a/packages/db/src/collection/sync.ts b/packages/db/src/collection/sync.ts index 1f50cc889..7b1fdc61e 100644 --- a/packages/db/src/collection/sync.ts +++ b/packages/db/src/collection/sync.ts @@ -18,6 +18,7 @@ import type { LoadSubsetOptions, OptimisticChangeMessage, SyncConfigRes, + SyncMetadataApi, } from '../types' import type { CollectionImpl } from './index.js' import type { CollectionStateManager } from './state' @@ -93,6 +94,8 @@ export class CollectionSyncManager< committed: false, operations: [], deletedKeys: new Set(), + rowMetadataWrites: new Map(), + collectionMetadataWrites: new Map(), immediate: options?.immediate, }) }, @@ -169,6 +172,15 @@ export class CollectionSyncManager< if (messageType === `delete`) { pendingTransaction.deletedKeys.add(key) + pendingTransaction.rowMetadataWrites.set(key, { type: `delete` }) + } else if ( + messageType === `insert` || + message.metadata !== undefined + ) { + pendingTransaction.rowMetadataWrites.set(key, { + type: `set`, + value: message.metadata, + }) } }, commit: () => { @@ -205,6 +217,7 @@ export class CollectionSyncManager< // Clear all operations from the current transaction pendingTransaction.operations = [] pendingTransaction.deletedKeys.clear() + pendingTransaction.rowMetadataWrites.clear() // Mark the transaction as a truncate operation. During commit, this triggers: // - Delete events for all previously synced keys (excluding optimistic-deleted keys) @@ -220,6 +233,7 @@ export class CollectionSyncManager< deletes: new Set(this.state.optimisticDeletes), } }, + metadata: this.createSyncMetadataApi(), }), ) @@ -245,6 +259,110 @@ export class CollectionSyncManager< } } + private getActivePendingSyncTransaction() { + const pendingTransaction = + this.state.pendingSyncedTransactions[ + this.state.pendingSyncedTransactions.length - 1 + ] + + if (!pendingTransaction) { + throw new NoPendingSyncTransactionWriteError() + } + if (pendingTransaction.committed) { + throw new SyncTransactionAlreadyCommittedWriteError() + } + + return pendingTransaction + } + + private createSyncMetadataApi(): SyncMetadataApi { + return { + row: { + get: (key) => { + const pendingTransaction = + this.state.pendingSyncedTransactions[ + this.state.pendingSyncedTransactions.length - 1 + ] + const pendingWrite = pendingTransaction?.rowMetadataWrites.get(key) + if (pendingWrite) { + return pendingWrite.type === `delete` + ? undefined + : pendingWrite.value + } + return this.state.syncedMetadata.get(key) + }, + set: (key, metadata) => { + const pendingTransaction = this.getActivePendingSyncTransaction() + pendingTransaction.rowMetadataWrites.set(key, { + type: `set`, + value: metadata, + }) + }, + delete: (key) => { + const pendingTransaction = this.getActivePendingSyncTransaction() + pendingTransaction.rowMetadataWrites.set(key, { + type: `delete`, + }) + }, + }, + collection: { + get: (key) => { + const pendingTransaction = + this.state.pendingSyncedTransactions[ + this.state.pendingSyncedTransactions.length - 1 + ] + const pendingWrite = + pendingTransaction?.collectionMetadataWrites.get(key) + if (pendingWrite) { + return pendingWrite.type === `delete` + ? undefined + : pendingWrite.value + } + return this.state.syncedCollectionMetadata.get(key) + }, + set: (key, value) => { + const pendingTransaction = this.getActivePendingSyncTransaction() + pendingTransaction.collectionMetadataWrites.set(key, { + type: `set`, + value, + }) + }, + delete: (key) => { + const pendingTransaction = this.getActivePendingSyncTransaction() + pendingTransaction.collectionMetadataWrites.set(key, { + type: `delete`, + }) + }, + list: (prefix) => { + const merged = new Map(this.state.syncedCollectionMetadata) + const pendingTransaction = + this.state.pendingSyncedTransactions[ + this.state.pendingSyncedTransactions.length - 1 + ] + if (pendingTransaction) { + for (const [ + key, + pendingWrite, + ] of pendingTransaction.collectionMetadataWrites) { + if (pendingWrite.type === `delete`) { + merged.delete(key) + } else { + merged.set(key, pendingWrite.value) + } + } + } + + return Array.from(merged.entries()) + .filter(([key]) => (prefix ? key.startsWith(prefix) : true)) + .map(([key, value]) => ({ + key, + value, + })) + }, + }, + } + } + /** * Preload the collection data by starting sync if not already started * Multiple concurrent calls will share the same promise diff --git a/packages/db/src/types.ts b/packages/db/src/types.ts index 9d84a1099..0dbd01780 100644 --- a/packages/db/src/types.ts +++ b/packages/db/src/types.ts @@ -339,6 +339,7 @@ export interface SyncConfig< commit: () => void markReady: () => void truncate: () => void + metadata?: SyncMetadataApi }) => void | CleanupFn | SyncConfigRes /** @@ -357,6 +358,25 @@ export interface SyncConfig< rowUpdateMode?: `partial` | `full` } +export interface SyncMetadataApi< + TKey extends string | number = string | number, +> { + row: { + get: (key: TKey) => unknown | undefined + set: (key: TKey, metadata: unknown) => void + delete: (key: TKey) => void + } + collection: { + get: (key: string) => unknown | undefined + set: (key: string, value: unknown) => void + delete: (key: string) => void + list: (prefix?: string) => ReadonlyArray<{ + key: string + value: unknown + }> + } +} + export interface ChangeMessage< T extends object = Record, TKey extends string | number = string | number, diff --git a/packages/db/tests/collection.test.ts b/packages/db/tests/collection.test.ts index 17f9a2ed9..9bb5cc63a 100644 --- a/packages/db/tests/collection.test.ts +++ b/packages/db/tests/collection.test.ts @@ -1440,6 +1440,129 @@ describe(`Collection`, () => { expect(collection._state.syncedMetadata.size).toBe(0) }) + it(`should allow startup metadata reads and commit metadata-only sync transactions`, async () => { + let observedCollectionMetadata: unknown + let testSyncFunctions: any = null + + const collection = createCollection<{ id: number; value: string }>({ + id: `sync-metadata-startup-read-test`, + getKey: (item) => item.id, + startSync: true, + sync: { + sync: ({ begin, commit, markReady, metadata }) => { + observedCollectionMetadata = metadata?.collection.get(`startup:key`) + + begin() + metadata?.collection.set(`startup:key`, { ready: true }) + commit() + markReady() + + testSyncFunctions = { begin, commit, metadata } + }, + }, + }) + + await collection.stateWhenReady() + + expect(observedCollectionMetadata).toBeUndefined() + expect( + collection._state.syncedCollectionMetadata.get(`startup:key`), + ).toEqual({ + ready: true, + }) + + const { begin, commit, metadata } = testSyncFunctions + begin() + metadata.collection.set(`runtime:key`, { persisted: true }) + commit() + + expect( + collection._state.syncedCollectionMetadata.get(`runtime:key`), + ).toEqual({ persisted: true }) + }) + + it(`should use last-write-wins for row metadata in sync transactions`, async () => { + let testSyncFunctions: any = null + + const collection = createCollection<{ id: number; value: string }>({ + id: `sync-row-metadata-last-write-test`, + getKey: (item) => item.id, + startSync: true, + sync: { + sync: ({ begin, write, commit, markReady, metadata }) => { + begin() + write({ + type: `insert`, + value: { id: 1, value: `initial` }, + metadata: { source: `write` }, + }) + metadata?.row.set(1, { source: `explicit-set` }) + commit() + markReady() + + testSyncFunctions = { begin, write, commit, metadata } + }, + }, + }) + + await collection.stateWhenReady() + + expect(collection._state.syncedMetadata.get(1)).toEqual({ + source: `explicit-set`, + }) + + const { begin, write, commit, metadata } = testSyncFunctions + begin() + metadata.row.set(1, { source: `set-first` }) + write({ + type: `update`, + value: { id: 1, value: `updated` }, + metadata: { source: `write-last` }, + }) + commit() + + expect(collection._state.syncedMetadata.get(1)).toEqual({ + source: `write-last`, + }) + }) + + it(`should delete row metadata when sync deletes the row`, async () => { + let testSyncFunctions: any = null + + const collection = createCollection<{ id: number; value: string }>({ + id: `sync-row-metadata-delete-test`, + getKey: (item) => item.id, + startSync: true, + sync: { + sync: ({ begin, write, commit, markReady }) => { + begin() + write({ + type: `insert`, + value: { id: 1, value: `initial` }, + metadata: { source: `sync` }, + }) + commit() + markReady() + + testSyncFunctions = { begin, write, commit } + }, + }, + }) + + await collection.stateWhenReady() + expect(collection._state.syncedMetadata.get(1)).toEqual({ source: `sync` }) + + const { begin, write, commit } = testSyncFunctions + begin() + write({ + type: `delete`, + key: 1, + }) + commit() + + expect(collection._state.syncedMetadata.has(1)).toBe(false) + }) + it(`open sync transaction isn't applied when optimistic mutation is resolved/rejected`, async () => { type Row = { id: number; name: string } diff --git a/packages/electric-db-collection/src/electric.ts b/packages/electric-db-collection/src/electric.ts index 1197e7734..9f6ebff9d 100644 --- a/packages/electric-db-collection/src/electric.ts +++ b/packages/electric-db-collection/src/electric.ts @@ -47,6 +47,7 @@ import type { ControlMessage, GetExtensions, Message, + Offset, PostgresSnapshot, Row, ShapeStreamOptions, @@ -1181,7 +1182,30 @@ function createElectricSync>( return { sync: (params: Parameters[`sync`]>[0]) => { - const { begin, write, commit, markReady, truncate, collection } = params + const { + begin, + write, + commit, + markReady, + truncate, + collection, + metadata, + } = params + const persistedResumeState = metadata?.collection.get( + `electric:resume`, + ) as + | { + kind: `resume` + offset: string + handle: string + shapeId: string + updatedAt: number + } + | { + kind: `reset` + updatedAt: number + } + | undefined // Wrap markReady to wait for test hook in progressive mode let progressiveReadyGate: Promise | null = null @@ -1239,7 +1263,17 @@ function createElectricSync>( // In on-demand mode, we only need the changes from the point of time the collection was created // so we default to `now` when there is no saved offset. offset: - shapeOptions.offset ?? (syncMode === `on-demand` ? `now` : undefined), + shapeOptions.offset ?? + (persistedResumeState?.kind === `resume` + ? (persistedResumeState.offset as Offset) + : syncMode === `on-demand` + ? `now` + : undefined), + handle: + shapeOptions.handle ?? + (persistedResumeState?.kind === `resume` + ? persistedResumeState.handle + : undefined), signal: abortController.signal, onError: (errorParams) => { // Just immediately mark ready if there's an error to avoid blocking @@ -1280,6 +1314,25 @@ function createElectricSync>( // duplicate key errors when the row's data has changed between requests. const syncedKeys = new Set() + const stageResumeMetadata = () => { + if (!metadata) { + return + } + const shapeHandle = stream.shapeHandle + const lastOffset = stream.lastOffset + if (!shapeHandle || lastOffset === `-1`) { + return + } + + metadata.collection.set(`electric:resume`, { + kind: `resume`, + offset: lastOffset, + handle: shapeHandle, + shapeId: shapeHandle, + updatedAt: Date.now(), + }) + } + /** * Process a change message: handle tags and write the mutation */ @@ -1464,6 +1517,11 @@ function createElectricSync>( transactionStarted = true } + metadata?.collection.set(`electric:resume`, { + kind: `reset`, + updatedAt: Date.now(), + }) + truncate() // Clear tag tracking state @@ -1534,6 +1592,7 @@ function createElectricSync>( } // Commit the atomic swap + stageResumeMetadata() commit() // Exit buffering phase by marking that we've received up-to-date @@ -1547,8 +1606,13 @@ function createElectricSync>( // Normal mode or on-demand: commit transaction if one was started // Both up-to-date and subset-end trigger a commit if (transactionStarted) { + stageResumeMetadata() commit() transactionStarted = false + } else if (commitPoint === `up-to-date` && metadata) { + begin() + stageResumeMetadata() + commit() } } wrappedMarkReady(isBufferingInitialSync()) diff --git a/packages/query-db-collection/src/query.ts b/packages/query-db-collection/src/query.ts index 7bc8f532b..cae8e145f 100644 --- a/packages/query-db-collection/src/query.ts +++ b/packages/query-db-collection/src/query.ts @@ -118,6 +118,7 @@ export interface QueryCollectionConfig< TQueryData, TQueryKey >[`staleTime`] + persistedGcTime?: number /** * Metadata to pass to the query. @@ -547,6 +548,7 @@ export function queryCollectionOptions( retry, retryDelay, staleTime, + persistedGcTime, getKey, onInsert, onUpdate, @@ -645,11 +647,74 @@ export function queryCollectionOptions( } const internalSync: SyncConfig[`sync`] = (params) => { - const { begin, write, commit, markReady, collection } = params + const { begin, write, commit, markReady, collection, metadata } = params // Track whether sync has been started let syncStarted = false + const getRowMetadata = (rowKey: string | number) => { + return (metadata?.row.get(rowKey) ?? + collection._state.syncedMetadata.get(rowKey)) as + | Record + | undefined + } + + const getPersistedOwners = (rowKey: string | number) => { + const rowMetadata = getRowMetadata(rowKey) + const queryMetadata = rowMetadata?.queryCollection + if (!queryMetadata || typeof queryMetadata !== `object`) { + return new Set() + } + + const owners = (queryMetadata as Record).owners + if (!owners || typeof owners !== `object`) { + return new Set() + } + + return new Set(Object.keys(owners as Record)) + } + + const setPersistedOwners = ( + rowKey: string | number, + owners: Set, + ) => { + if (!metadata) { + return + } + + const currentMetadata = { ...(getRowMetadata(rowKey) ?? {}) } + if (owners.size === 0) { + delete currentMetadata.queryCollection + if (Object.keys(currentMetadata).length === 0) { + metadata.row.delete(rowKey) + } else { + metadata.row.set(rowKey, currentMetadata) + } + return + } + + metadata.row.set(rowKey, { + ...currentMetadata, + queryCollection: { + owners: Object.fromEntries( + Array.from(owners.values()).map((owner) => [owner, true]), + ), + }, + }) + } + + const getOwnedRowsForQuery = (hashedQueryKey: string) => { + const ownedRows = new Set() + for (const [rowKey] of collection._state.syncedData.entries()) { + const owners = getPersistedOwners(rowKey) + if (owners.has(hashedQueryKey)) { + ownedRows.add(rowKey) + addRow(rowKey, hashedQueryKey) + } + } + return ownedRows + } + /** * Generate a consistent query key from LoadSubsetOptions. * CRITICAL: Must use identical logic in both createQueryFromOpts and unloadSubset @@ -680,6 +745,12 @@ export function queryCollectionOptions( const hashedQueryKey = hashKey(key) const extendedMeta = { ...meta, loadSubsetOptions: opts } + if (metadata) { + begin() + metadata.collection.delete(`queryCollection:gc:${hashedQueryKey}`) + commit() + } + if (state.observers.has(hashedQueryKey)) { // We already have a query for this queryKey // Increment reference count since another consumer is using this observer @@ -830,6 +901,7 @@ export function queryCollectionOptions( const currentSyncedItems: Map = new Map( collection._state.syncedData.entries(), ) + const previouslyOwnedRows = getOwnedRowsForQuery(hashedQueryKey) const newItemsMap = new Map() newItemsArray.forEach((item) => { const key = getKey(item) @@ -838,9 +910,16 @@ export function queryCollectionOptions( begin() - currentSyncedItems.forEach((oldItem, key) => { + previouslyOwnedRows.forEach((key) => { + const oldItem = currentSyncedItems.get(key) + if (!oldItem) { + return + } const newItem = newItemsMap.get(key) if (!newItem) { + const owners = getPersistedOwners(key) + owners.delete(hashedQueryKey) + setPersistedOwners(key, owners) const needToRemove = removeRow(key, hashedQueryKey) // returns true if the row is no longer referenced by any queries if (needToRemove) { write({ type: `delete`, value: oldItem }) @@ -852,6 +931,11 @@ export function queryCollectionOptions( }) newItemsMap.forEach((newItem, key) => { + const owners = getPersistedOwners(key) + if (!owners.has(hashedQueryKey)) { + owners.add(hashedQueryKey) + setPersistedOwners(key, owners) + } addRow(key, hashedQueryKey) if (!currentSyncedItems.has(key)) { write({ type: `insert`, value: newItem }) @@ -968,6 +1052,11 @@ export function queryCollectionOptions( const rowKeys = queryToRows.get(hashedQueryKey) ?? new Set() const rowsToDelete: Array = [] + const shouldWriteMetadata = metadata !== undefined && rowKeys.size > 0 + + if (shouldWriteMetadata) { + begin() + } rowKeys.forEach((rowKey) => { const queries = rowToQueries.get(rowKey) @@ -977,6 +1066,7 @@ export function queryCollectionOptions( } queries.delete(hashedQueryKey) + setPersistedOwners(rowKey, queries) if (queries.size === 0) { rowToQueries.delete(rowKey) @@ -987,11 +1077,17 @@ export function queryCollectionOptions( } }) - if (rowsToDelete.length > 0) { + if (!shouldWriteMetadata && rowsToDelete.length > 0) { begin() + } + + if (rowsToDelete.length > 0) { rowsToDelete.forEach((row) => { write({ type: `delete`, value: row }) }) + } + + if (shouldWriteMetadata || rowsToDelete.length > 0) { commit() } @@ -1034,6 +1130,28 @@ export function queryCollectionOptions( ) } + if (persistedGcTime !== undefined) { + if (metadata) { + begin() + metadata.collection.set(`queryCollection:gc:${hashedQueryKey}`, { + queryHash: hashedQueryKey, + mode: + persistedGcTime === Number.POSITIVE_INFINITY + ? `until-revalidated` + : `ttl`, + ...(persistedGcTime === Number.POSITIVE_INFINITY + ? {} + : { expiresAt: Date.now() + persistedGcTime }), + }) + commit() + } + unsubscribes.get(hashedQueryKey)?.() + unsubscribes.delete(hashedQueryKey) + state.observers.delete(hashedQueryKey) + queryRefCounts.set(hashedQueryKey, 0) + return + } + cleanupQueryInternal(hashedQueryKey) } diff --git a/packages/query-db-collection/tests/query.test.ts b/packages/query-db-collection/tests/query.test.ts index 9ad0f251f..4d0116fe2 100644 --- a/packages/query-db-collection/tests/query.test.ts +++ b/packages/query-db-collection/tests/query.test.ts @@ -1,5 +1,5 @@ import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest' -import { QueryClient } from '@tanstack/query-core' +import { QueryClient, hashKey } from '@tanstack/query-core' import { createCollection, createLiveQueryCollection, @@ -4275,6 +4275,45 @@ describe(`QueryCollection`, () => { }) }) + it(`should diff against persisted query-owned rows on warm start`, async () => { + const baseQueryKey = [`persisted-baseline-test`] + const queryFn = vi.fn().mockResolvedValue([]) + + const config: QueryCollectionConfig = { + id: `persisted-baseline-test`, + queryClient, + queryKey: baseQueryKey, + queryFn, + getKey: (item) => item.id, + syncMode: `eager`, + startSync: false, + } + + const collection = createCollection(queryCollectionOptions(config)) + const ownedRow = { id: `1`, name: `Owned row`, category: `A` } + const unrelatedRow = { id: `2`, name: `Unrelated row`, category: `B` } + const ownedQueryHash = hashKey(baseQueryKey) + + collection._state.syncedData.set(ownedRow.id, ownedRow) + collection._state.syncedData.set(unrelatedRow.id, unrelatedRow) + collection._state.syncedMetadata.set(ownedRow.id, { + queryCollection: { + owners: { + [ownedQueryHash]: true, + }, + }, + }) + collection._state.syncedMetadata.set(unrelatedRow.id, undefined) + collection._state.size = 2 + + await collection.preload() + await flushPromises() + + expect(queryFn).toHaveBeenCalledTimes(1) + expect(collection.has(ownedRow.id)).toBe(false) + expect(collection.has(unrelatedRow.id)).toBe(true) + }) + it(`should reset refcount after query GC and reload (stale refcount bug)`, async () => { // This test catches Bug 2: stale refcounts after GC/remove // When TanStack Query GCs a query, the refcount should be cleaned up diff --git a/persisted-sync-metadata-plan/01-core-api.md b/persisted-sync-metadata-plan/01-core-api.md new file mode 100644 index 000000000..d25a9b279 --- /dev/null +++ b/persisted-sync-metadata-plan/01-core-api.md @@ -0,0 +1,192 @@ +# Phase 1: Core API + +## Objective + +Add a transactional sync metadata API to `@tanstack/db` that supports: + +- row metadata +- collection metadata +- metadata-only committed sync transactions +- read-your-own-writes semantics inside a sync transaction + +This phase should not require query collection or Electric changes to ship. It +is the core primitive they will later consume. + +## Primary code areas + +- `packages/db/src/types.ts` +- `packages/db/src/collection/sync.ts` +- `packages/db/src/collection/state.ts` +- `packages/db/tests/collection.test.ts` +- any new core tests needed for metadata transaction behavior + +## Proposed implementation steps + +### 1. Extend sync types + +Update the sync params type to include: + +- `metadata.row.get` +- `metadata.row.set` +- `metadata.row.delete` +- `metadata.collection.get` +- `metadata.collection.set` +- `metadata.collection.delete` +- `metadata.collection.list` + +Key requirements: + +- metadata API is optional +- metadata writes outside an active sync transaction throw +- startup reads through `metadata.row.get`, `metadata.collection.get`, and + `metadata.collection.list` are allowed outside a transaction +- reads inside an active transaction must reflect staged metadata writes + +### 2. Extend pending sync transaction state + +Update the internal pending synced transaction shape so it can stage: + +- row operations +- row metadata writes +- collection metadata writes +- truncate/reset state + +Suggested internal shape: + +```ts +type PendingMetadataWrite = { type: 'set'; value: unknown } | { type: 'delete' } + +type PendingSyncedTransaction = { + committed: boolean + operations: Array> + deletedKeys: Set + rowMetadataWrites: Map + collectionMetadataWrites: Map + truncate?: boolean + immediate?: boolean +} +``` + +Exact naming is flexible, but the staged metadata writes must be co-located with +the existing pending sync transaction. + +### 3. Add in-memory collection metadata state + +Add a new in-memory store in `CollectionStateManager` for collection-scoped +synced metadata. + +Suggested field: + +```ts +public syncedCollectionMetadata = new Map() +``` + +This should behave like `syncedMetadata`, but keyed by metadata key rather than +row key. + +Note: this naming sits next to the existing row-scoped `syncedMetadata`. If the +implementation keeps both names, it should add clear comments distinguishing row +metadata from collection metadata. Renaming the existing row-scoped field to +something more explicit can be considered as a follow-up cleanup. + +### 4. Define overwrite semantics + +Document and implement these rules: + +- `write({ metadata })` and `metadata.row.set()` target the same underlying row + metadata state +- later staged writes win within a transaction +- every staged row metadata write is a replace at the transaction layer +- `delete` removes row metadata +- `metadata.row.set()` replaces the full row metadata blob +- `metadata.row.delete()` removes row metadata +- `metadata.collection.set()` replaces the full collection metadata value for + that key +- `metadata.collection.delete()` removes the value + +If callers need merge behavior, they should: + +1. read the current metadata value +2. compute the merged result +3. stage the merged result explicitly + +This avoids contradictory rules when `write({ metadata })` and +`metadata.row.set()` are both used for the same row in one transaction. + +### 5. Support metadata-only transactions + +Ensure `commitPendingTransactions()` can commit a transaction with: + +- zero row operations and non-zero metadata changes +- row metadata changes only +- collection metadata changes only + +This is a hard requirement for later Electric resume persistence and query +retention persistence. + +### 6. Define truncate behavior + +Core truncate semantics must be explicit: + +- clear `syncedData` +- clear `syncedMetadata` +- clear any row-scoped staged metadata +- leave collection metadata alone unless a higher layer explicitly resets it + +The core layer should not silently delete collection metadata on truncate. +Per-sync reset behavior can be layered on later. + +### 7. Define row-delete semantics + +Deleting a row through sync also deletes its row metadata. + +This should hold regardless of whether row metadata had previously been staged +through `write({ metadata })` or `metadata.row.set()`. + +### 8. Scope metadata to sync paths + +This metadata API is sync-only. + +It is not intended to flow through user mutation transport types such as +`PersistedMutationEnvelope`. User mutations may still observe `syncMetadata` +coming from already-synced rows, but they do not independently persist metadata +through this API. + +## Edge cases to handle + +- `metadata.row.set()` called before `begin()` +- `metadata.collection.set()` called after `commit()` +- `metadata.collection.get()` called before `begin()` during startup +- `metadata.row.get()` after a staged `row.set()` in the same transaction +- `metadata.collection.list(prefix)` after multiple staged collection writes +- mixing `write({ metadata })` and `metadata.row.set()` for the same key in the + same transaction +- row delete after earlier staged row metadata updates in the same transaction +- truncate followed by new staged row metadata in the same transaction +- empty transaction commit with only metadata writes + +## Acceptance criteria + +- core sync API can stage and commit row metadata +- core sync API can stage and commit collection metadata +- metadata reads inside a transaction see staged writes +- metadata-only commits work +- existing collection behavior without metadata remains unchanged + +## Suggested tests + +- commit row metadata through `write({ metadata })` +- commit row metadata through `metadata.row.set()` +- commit collection metadata through `metadata.collection.set()` +- verify read-your-own-writes inside a transaction +- verify startup reads outside a transaction succeed +- verify last-write-wins for staged row metadata +- verify metadata writes outside a transaction throw +- verify row delete removes row metadata +- verify truncate clears row metadata but not collection metadata +- verify metadata-only transactions commit successfully + +## Exit criteria + +Phase 1 is complete when the core collection layer can represent, stage, commit, +and read metadata correctly in memory, independent of any persistence adapter. diff --git a/persisted-sync-metadata-plan/02-sqlite-implementation.md b/persisted-sync-metadata-plan/02-sqlite-implementation.md new file mode 100644 index 000000000..f922210b3 --- /dev/null +++ b/persisted-sync-metadata-plan/02-sqlite-implementation.md @@ -0,0 +1,184 @@ +# Phase 2: SQLite Implementation + +## Objective + +Make `db-sqlite-persisted-collection-core` the reference implementation of the +metadata API by persisting: + +- row metadata with row values +- collection metadata in a dedicated table +- row and metadata changes in the same SQLite transaction + +## Primary code areas + +- `packages/db-sqlite-persisted-collection-core/src/sqlite-core-adapter.ts` +- `packages/db-sqlite-persisted-collection-core/src/persisted.ts` +- `packages/db-sqlite-persisted-collection-core/tests/persisted.test.ts` +- `packages/db-sqlite-persisted-collection-core/tests/sqlite-core-adapter.test.ts` +- restart/runtime persistence contract tests + +## Proposed implementation steps + +### 1. Extend SQLite schema + +Add: + +- `metadata TEXT` column to persisted collection row tables +- `collection_metadata` table for collection-scoped metadata + +Suggested shape: + +```sql +CREATE TABLE IF NOT EXISTS collection_metadata ( + collection_id TEXT NOT NULL, + key TEXT NOT NULL, + value TEXT NOT NULL, + updated_at INTEGER NOT NULL, + PRIMARY KEY (collection_id, key) +) +``` + +### 2. Extend persisted row hydration + +Update the adapter hydration path to return: + +```ts +type PersistedLoadedRow = { + key: TKey + value: T + metadata?: unknown +} +``` + +The persisted runtime must pass hydrated metadata into the collection sync +transaction, not drop it during `applyRowsToCollection()` or related paths. + +### 3. Extend persisted tx shape + +Update internal persisted tx machinery to support: + +- row value writes +- row metadata writes +- collection metadata writes + +This should be reflected in: + +- normalized sync operation shapes +- buffered sync transactions +- adapter `applyCommittedTx()` +- replay payload classification so the runtime knows when exact targeted replay + is possible and when it must fall back to reload + +### 4. Make metadata transactional in SQLite + +All of these must commit in one SQLite transaction: + +- row inserts/updates/deletes +- row metadata changes +- collection metadata changes +- version/stream position updates already associated with the tx + +This is the key correctness property for the whole design. + +### 5. Load collection metadata at startup + +The persisted runtime should load collection metadata during startup, before new +sync subscriptions start processing. This is necessary for: + +- query placeholder retention decisions +- Electric resume-state restoration +- future collection-scoped metadata consumers + +This should be reflected in the adapter contract explicitly, for example via: + +```ts +loadCollectionMetadata?: ( + collectionId: string, +) => Promise> +``` + +The exact method name is flexible, but startup collection metadata loading must +be a first-class adapter capability. + +### 6. Carry metadata through replay and hydration + +Metadata must not be lost in: + +- initial hydration +- buffered sync transaction application +- internal persisted transaction creation +- self/follower replay +- `pullSince`-style gap recovery + +For the first pass, replay behavior should be explicit: + +- hydration must carry row metadata exactly +- local commit must carry row and collection metadata exactly +- if a committed tx contains metadata changes and the targeted replay protocol + cannot represent them exactly, followers should fall back to reload behavior +- if gap recovery encounters metadata-bearing changes it cannot replay exactly, + recovery should also fall back to reload behavior + +This must be documented in the implementation, not left implicit. + +## Important design constraints + +### Metadata-only committed txs + +The persisted layer must support transactions with: + +- no row mutations +- collection metadata changes only + +This is required for: + +- Electric resume metadata commits +- query retention metadata updates + +### Serialization + +Use the same persisted JSON encoding and decoding path already used for row +values, so metadata can safely round-trip supported value types. + +### Crash-consistency boundary + +The implementation must keep row writes, row metadata writes, and collection +metadata writes inside the same SQLite transaction boundary. + +If any part of the tx fails, all three categories must roll back together. + +## Edge cases to handle + +- metadata-only tx commit +- delete row with row metadata present +- row update with partial row value and metadata merge semantics +- crash/restart between repeated tx applications +- replay of metadata-bearing committed txs to follower tabs +- sequence-gap recovery when metadata changed in a missed tx +- full reload fallback correctness when targeted metadata replay is unavailable +- startup collection metadata load before subscription processing + +## Acceptance criteria + +- persisted rows round-trip metadata +- collection metadata round-trips independently +- row data and metadata commit atomically +- metadata-only committed txs persist correctly +- startup loads collection metadata and hydrated row metadata +- replay/recovery remains correct, even if it uses conservative reload fallback + +## Suggested tests + +- SQLite adapter stores and loads row metadata +- SQLite adapter stores and loads collection metadata +- `applyCommittedTx()` atomically commits row and collection metadata +- metadata-only tx survives restart +- hydrated rows apply metadata into collection state +- follower runtime converges on metadata-bearing txs +- seq-gap recovery remains correct when metadata changed +- startup collection metadata loads before any sync subscription attaches + +## Exit criteria + +Phase 2 is complete when SQLite-backed persisted collections can durably store, +hydrate, and replay metadata with the same transactional guarantees as row data. diff --git a/persisted-sync-metadata-plan/03-query-collection.md b/persisted-sync-metadata-plan/03-query-collection.md new file mode 100644 index 000000000..a467cd6ae --- /dev/null +++ b/persisted-sync-metadata-plan/03-query-collection.md @@ -0,0 +1,270 @@ +# Phase 3: Query Collection + +## Objective + +Migrate `query-db-collection` to the new metadata primitives so it can: + +- preserve row ownership across restart +- support persisted query retention independently from in-memory `gcTime` +- support long-lived offline warm starts +- reconcile retained persisted rows when the same query is requested again + +## Primary code areas + +- `packages/query-db-collection/src/query.ts` +- `packages/query-db-collection/src/serialization.ts` +- `packages/query-db-collection/tests/query.test.ts` +- persisted runtime integration tests combining query collection and SQLite + +## High-level design + +### Persisted on rows + +Store per-row ownership in row metadata: + +```ts +type QueryRowMetadata = { + queryCollection?: { + owners: Record + } +} +``` + +### Persisted at collection scope + +Store query retention/placeholder metadata at collection scope. + +Suggested entry shape: + +```ts +type PersistedQueryRetentionEntry = + | { + queryHash: string + mode: 'ttl' + expiresAt: number + } + | { + queryHash: string + mode: 'until-revalidated' + } +``` + +Suggested keys: + +- `queryCollection:gc:` +- optionally `queryCollection:query:` for serialized query identity +- optionally `queryCollection:metaVersion` for query metadata versioning + +## Proposed implementation steps + +### 1. Add persisted retention option to query collection config + +Introduce a durable retention control that is independent from: + +- `staleTime` +- in-memory `gcTime` + +Possible public API shapes: + +```ts +persistedGcTime?: number | typeof Infinity +``` + +or + +```ts +persistedRetention?: { + gcTime: number | typeof Infinity +} +``` + +The second shape is more extensible, but either is acceptable. + +This should be added to the public query collection option types defined in +`packages/query-db-collection/src/query.ts`. + +### 2. Rebuild ownership from hydrated rows + +When rows are hydrated from persistence: + +- inspect row metadata for query owners +- rebuild `rowToQueries` +- rebuild `queryToRows` + +This reconstruction is incremental and subset-scoped. + +### 3. Keep refcounts in memory only + +Do not persist `queryRefCounts`. + +They represent live subscriber/process state and should restart from zero. + +### 4. Persist ownership changes transactionally + +Whenever ownership changes for a row: + +- update row metadata in the same sync transaction + +This includes metadata-only ownership changes where the row value itself is +unchanged. + +### 5. Persist query retention state + +When a query becomes inactive: + +- if persisted retention is finite, persist `mode: 'ttl'` with `expiresAt` +- if persisted retention is infinite, persist `mode: 'until-revalidated'` + +This retention entry is independent from in-memory query `gcTime`. + +### 6. Startup retention handling + +At startup: + +- load collection metadata retention entries before new subscriptions attach +- clean up expired `ttl` placeholders +- skip startup GC for `until-revalidated` placeholders + +Startup retention cleanup must run under the same mutex or startup critical +section as hydration and replay to avoid races with new query subscriptions. + +### 7. Explicit cold-row cleanup strategy for expired TTL placeholders + +Phase 3 must define a concrete cold-row cleanup path for on-demand mode. + +For the initial Level 1 implementation, that path should be one of: + +- adapter-driven full scan of persisted rows with non-null row metadata, or +- denormalized owned row keys stored on the retention entry itself + +The implementation must choose one and document it. Startup cleanup cannot be +left as an abstract promise if expired placeholders may own rows that are not +currently hydrated. + +If the first implementation uses the scan-based path, it should do all of the +following under the same startup mutex: + +1. find rows owned by the expired placeholder +2. remove the placeholder from each row's owner set +3. delete rows whose owner set becomes empty +4. delete the placeholder retention entry + +### 8. Revalidation flow for indefinite persisted retention + +When a query retained with `mode: 'until-revalidated'` is requested again: + +1. match the placeholder by canonical query identity +2. use persisted ownership as the baseline +3. run the query +4. diff server results against previously owned rows +5. remove rows that are no longer owned +6. clear or refresh the retention entry based on the new lifecycle state + +This is the key behavior required for long offline periods. + +This revalidation baseline is required for correctness. The implementation must +not continue to diff only against all rows in `collection._state.syncedData`, +because that would preserve the warm-start deletion bug this phase is intended +to fix. + +In on-demand mode, if the previously owned rows are not all hydrated in memory, +the implementation must obtain the baseline from persisted ownership data +directly, either via: + +- row metadata scan / lookup, or +- denormalized owned row keys on the retention entry, or +- a future normalized ownership index + +### 9. Use query-owned baseline for reconciliation + +When reconciling a query after restart or revalidation, diff against: + +- the rows previously owned by the specific query + +This is not an optional improvement. It is the required reconciliation model for +Phase 3. + +## Important design constraints + +### Persisted retention is not freshness + +Long-lived persisted data may be very stale. + +That is acceptable as long as: + +- re-requesting the query still follows normal query refetch behavior +- persisted retention does not imply anything about `staleTime` + +### Infinite persisted retention needs explicit eviction eventually + +If `persistedGcTime: Infinity` or `mode: 'until-revalidated'` is supported, +storage can grow without bound. This phase does not need to ship explicit +eviction APIs, but the design should leave room for: + +- evict one query placeholder +- evict all query placeholders for a collection +- evict by age or storage-pressure policy + +### Runtime TTL expiry needs explicit policy + +Finite persisted retention should not only be handled on restart. + +When a `ttl` placeholder expires while the app remains running, the runtime +should schedule the same cleanup flow that startup cleanup would perform: + +1. locate the rows owned by the placeholder +2. remove the placeholder from those rows +3. delete orphaned rows +4. remove the retention entry + +This runtime TTL cleanup should run under the same mutex used for startup +cleanup and query revalidation. + +### Versioning matters + +If query identity hashing or serialization changes across app versions, retained +placeholders may become unreachable. + +The implementation should leave room for: + +- metadata versioning +- collection-level invalidation of incompatible retained placeholders + +## Edge cases to handle + +- multiple overlapping queries owning the same row +- query unsubscribes and resubscribes before persisted retention cleanup runs +- query retained indefinitely while another query updates shared rows +- startup with only a subset of rows hydrated in on-demand mode +- expired `ttl` placeholder owning only cold rows in on-demand mode +- placeholder exists but the same query is never requested again +- query identity serialization changes across versions +- metadata-only ownership updates with unchanged row values +- rows retained indefinitely while offline for a long period + +## Acceptance criteria + +- restart does not incorrectly delete persisted rows before ownership is restored +- row ownership survives restart +- query retention is persisted independently from `gcTime` +- `until-revalidated` retention keeps persisted rows available indefinitely +- re-requesting a retained query reconciles the retained rows correctly + +## Suggested tests + +- warm-start with multiple disjoint queries does not drop unrelated rows +- overlapping queries preserve shared row ownership across restart +- finite persisted retention expires and cleans up orphaned rows +- finite persisted retention expires while the app remains running +- indefinite persisted retention survives restart and long offline gaps +- re-requesting an indefinite retained query reconciles deleted rows correctly +- in-memory `gcTime` expiry does not remove indefinitely retained persisted rows +- on-demand hydration reconstructs ownership for loaded subsets +- on-demand expired-placeholder cleanup handles cold rows correctly +- metadata-only ownership updates persist correctly + +## Exit criteria + +Phase 3 is complete when query collections can warm-start safely from persisted +data, preserve ownership across restart, and independently control durable query +retention for offline-first users. diff --git a/persisted-sync-metadata-plan/04-electric-collection.md b/persisted-sync-metadata-plan/04-electric-collection.md new file mode 100644 index 000000000..7586e4a8e --- /dev/null +++ b/persisted-sync-metadata-plan/04-electric-collection.md @@ -0,0 +1,174 @@ +# Phase 4: Electric Collection + +## Objective + +Migrate `electric-db-collection` to use transactional collection metadata and +row metadata so it can: + +- persist durable resume state +- warm-start from persisted rows safely +- resume streaming from a persisted stream identity when valid +- leave room for future persistence of additional Electric-derived state + +## Primary code areas + +- `packages/electric-db-collection/src/electric.ts` +- `packages/electric-db-collection/tests/electric.test.ts` +- `packages/electric-db-collection/tests/electric-live-query.test.ts` +- persisted integration tests combining Electric and SQLite persistence + +## High-level design + +### Collection metadata + +Persist Electric resume state at collection scope. + +Suggested shape: + +```ts +type ElectricResumeMetadata = + | { + kind: 'resume' + offset: string + handle: string + shapeId: string + updatedAt: number + } + | { + kind: 'reset' + updatedAt: number + } +``` + +Suggested key: + +- `electric:resume` + +### Row metadata + +Persist useful per-row sync metadata through the same row metadata channel used +by `write({ metadata })`. + +Examples: + +- relation identity +- row sync headers that are useful after hydration + +## Proposed implementation steps + +### 1. Read resume metadata at startup + +On sync initialization: + +- read `electric:resume` from collection metadata +- if `kind: 'resume'`, prefer that persisted stream identity over the current + fallback behavior +- if resume metadata is absent or invalid, fall back to the existing startup + behavior + +### 2. Persist resume state transactionally + +When an Electric batch advances the durable resume point: + +- stage the new `electric:resume` metadata in the same sync transaction as the + row changes from that batch + +This prevents the invalid state where a resume token advances beyond the rows +that were actually committed. + +### 3. Support metadata-only resume updates when needed + +If Electric needs to persist a new durable resume state on a control-message +boundary without a row mutation in the same batch, use a metadata-only sync +transaction. + +This depends on Phase 1 and Phase 2 support for metadata-only commits. + +### 4. Define reset behavior + +When Electric determines the persisted resume state is invalid or a must-refetch +equivalent restart path is required: + +- clear or replace `electric:resume` with a `kind: 'reset'` marker +- perform the corresponding conservative reload path + +This makes restart behavior explicit rather than relying on stale resume state. + +Ordering requirement: + +- write the `kind: 'reset'` marker before starting the refetch/reload path, + using a metadata-only transaction if needed + +That way, if the app crashes during refetch, restart will not attempt to resume +from stale persisted stream state. + +### 5. Carry row metadata through hydration + +Hydrated rows from SQLite should restore the Electric row metadata that was +originally written through `write({ metadata })`. + +This provides a better baseline for future Electric restart reconstruction work. + +## Important design constraints + +### Resume metadata is not the full Electric state + +Electric also maintains derived in-memory state such as: + +- tag indexes +- synced key tracking +- snapshot and txid matching state + +This phase does not require exact restart reconstruction of every one of these. +It only requires a sound transactional place to persist the pieces that should +survive restart. + +### Be conservative when reconstruction is incomplete + +If persisted resume metadata is present but the required derived state is not +reconstructible safely, Electric should fall back to a conservative reload path +rather than assume exact restart correctness. + +### Strong stream identity matters + +Resume metadata should persist enough identity to detect incompatible resume +state, not just an offset. + +At minimum: + +- `offset` +- `handle` +- `shapeId` + +## Edge cases to handle + +- persisted resume metadata missing one required field +- resume metadata exists but shape identity no longer matches server state +- metadata-only resume update +- restart after partially applied or replayed batches +- must-refetch/reset flows clearing or replacing persisted resume state +- hydrated rows restoring row metadata while resume metadata is absent + +## Acceptance criteria + +- Electric resume state survives restart +- resume metadata only advances when the corresponding batch commits +- invalid resume metadata triggers conservative fallback +- metadata-only resume commits work +- persisted row metadata survives hydration where relevant + +## Suggested tests + +- batch commit persists rows and resume metadata atomically +- failed batch does not advance resume metadata +- restart uses persisted resume metadata when valid +- restart falls back safely when persisted resume metadata is invalid +- metadata-only resume tx survives restart +- must-refetch/reset clears or invalidates persisted resume state correctly +- row metadata written by Electric survives SQLite hydration + +## Exit criteria + +Phase 4 is complete when Electric has a durable, transactional resume-state +story that is compatible with persisted warm starts and conservative fallback +behavior. diff --git a/persisted-sync-metadata-plan/05-test-plan.md b/persisted-sync-metadata-plan/05-test-plan.md new file mode 100644 index 000000000..9dc49fc0e --- /dev/null +++ b/persisted-sync-metadata-plan/05-test-plan.md @@ -0,0 +1,229 @@ +# Phase 5: Test Plan + +## Objective + +Validate the persisted sync metadata design with invariants-focused tests across: + +- core collection state +- SQLite persistence +- query collection restart and retention behavior +- Electric resume behavior + +This plan is intentionally thorough. The feature crosses multiple layers and is +easy to get "mostly working" while still breaking on restart, replay, or long +offline gaps. + +## Testing principles + +- prefer behavior/invariant tests over implementation-detail tests +- add restart tests wherever durable state is introduced +- add crash-consistency style tests wherever atomicity is claimed +- test both eager and on-demand flows where behavior differs +- test replay/recovery paths, not just happy-path startup + +## Invariants + +### Core invariants + +- metadata that is staged in a sync transaction is visible to reads in that same + transaction +- metadata is committed iff the surrounding sync transaction commits +- metadata-only transactions are valid committed sync transactions +- row metadata and collection metadata are isolated but share the same commit + boundary +- truncate clears row metadata but does not silently clear collection metadata +- startup reads of persisted metadata are allowed outside a transaction + +### SQLite invariants + +- row values and row metadata are committed atomically +- collection metadata commits atomically with the same persisted tx +- hydrated rows restore both value and metadata + +### Query collection invariants + +- warm-start does not delete unrelated persisted rows before ownership is + reconstructed +- row ownership survives restart +- query placeholder retention survives restart +- finite persisted retention expires correctly +- indefinite persisted retention does not expire due to in-memory `gcTime` +- re-requesting an indefinitely retained query reconciles retained rows +- retained rows may be stale, but they remain available until revalidation or + explicit cleanup + +### Electric invariants + +- resume metadata advances iff the corresponding batch commits +- invalid resume metadata does not cause unsafe resume behavior +- metadata-only resume updates are persisted +- restart can use persisted resume metadata when valid + +### Replay and recovery invariants + +- follower tabs converge on metadata-bearing tx behavior +- sequence-gap recovery remains correct when metadata changed +- conservative reload fallback remains correct when targeted metadata replay is + unavailable + +## Test matrix + +### Core API tests + +Target files: + +- `packages/db/tests/collection.test.ts` +- additional focused tests if needed + +Cases: + +- `metadata.row.set()` inside a transaction +- `metadata.collection.set()` inside a transaction +- `metadata.collection.get()` outside a transaction during startup +- read-your-own-writes for row metadata +- read-your-own-writes for collection metadata +- metadata-only commit +- metadata writes outside a transaction throw +- `write({ metadata })` and `metadata.row.set()` on the same row in one tx +- truncate behavior with row metadata present + +### SQLite adapter and runtime tests + +Target files: + +- `packages/db-sqlite-persisted-collection-core/tests/sqlite-core-adapter.test.ts` +- `packages/db-sqlite-persisted-collection-core/tests/persisted.test.ts` +- runtime persistence contract tests + +Cases: + +- row metadata persists and hydrates +- collection metadata persists and loads +- metadata-only tx survives restart +- row delete removes row metadata +- metadata-bearing tx replay correctness +- sequence-gap recovery with metadata changes + +### Query collection integration tests + +Target files: + +- `packages/query-db-collection/tests/query.test.ts` +- new persisted integration tests as needed + +Cases: + +- multiple disjoint queries warm-start without deleting each other's rows +- overlapping queries preserve shared ownership across restart +- persisted ownership reconstruction in eager mode +- persisted ownership reconstruction in on-demand mode for loaded subsets +- finite persisted retention expiry +- finite persisted retention expiry while the app remains running +- `persistedGcTime: Infinity` or equivalent indefinite retention +- in-memory `gcTime` expiry does not remove indefinitely retained persisted rows +- re-requesting an indefinitely retained query reconciles stale/deleted rows +- query reconciliation diffs against the query-owned baseline, not the whole + collection +- expired placeholder cleanup handles cold rows in on-demand mode +- query identity version mismatch / incompatible retained metadata fallback + +### Electric integration tests + +Target files: + +- `packages/electric-db-collection/tests/electric.test.ts` +- `packages/electric-db-collection/tests/electric-live-query.test.ts` +- new persisted integration tests as needed + +Cases: + +- commit rows + resume metadata atomically +- failed commit does not advance resume metadata +- metadata-only resume transaction +- valid resume metadata used on restart +- invalid resume metadata triggers conservative fallback +- reset/must-refetch clears or invalidates resume metadata +- row metadata survives SQLite hydration + +## Suggested delivery cadence + +### While implementing Phase 1 + +Add: + +- core transaction semantics tests +- metadata-only transaction tests + +### While implementing Phase 2 + +Add: + +- SQLite schema and hydration tests +- adapter atomicity tests +- runtime restart tests +- transaction-boundary tests that prove row data, row metadata, and collection + metadata share the same SQLite commit/rollback boundary + +### While implementing Phase 3 + +Add: + +- query ownership restart tests +- finite retention tests +- indefinite retention tests +- long-offline warm-start tests +- on-demand cold-row cleanup tests +- runtime TTL expiry tests + +### While implementing Phase 4 + +Add: + +- resume metadata tests +- metadata-only resume tests +- invalid resume fallback tests + +## Failure modes the tests must catch + +- persisted rows exist but metadata is missing after restart +- metadata exists but corresponding rows were not committed +- query warm-start deletes rows it does not own +- rows retained indefinitely disappear because in-memory GC elapsed +- startup GC races with new subscriptions +- follower runtimes diverge because metadata-bearing txs were not replayed +- Electric resumes from a token that was never durably committed + +## Crash-consistency testing approach + +Where atomicity is claimed, tests should verify transaction boundaries rather +than merely assume SQLite atomicity. + +Suggested approach: + +- use a driver or adapter double that records transaction boundaries +- force failures after some writes have been staged but before commit completes +- verify row values, row metadata, and collection metadata all roll back + together + +This is especially important for `applyCommittedTx()` and any metadata-only tx +paths. + +## Performance regression checks + +Add lightweight regression coverage for: + +- row hydration with metadata present +- row writes with metadata absent +- row writes with metadata present + +These do not need to be strict benchmarks, but they should catch obvious +accidental regressions caused by metadata serialization or replay changes. + +## Definition of done + +This plan is complete when: + +- each phase ships with the tests listed for that phase +- restart, replay, and retention invariants are covered +- the long-offline persisted query use case is explicitly validated +- metadata atomicity is tested, not just assumed diff --git a/persisted-sync-metadata-plan/README.md b/persisted-sync-metadata-plan/README.md new file mode 100644 index 000000000..10c99ec74 --- /dev/null +++ b/persisted-sync-metadata-plan/README.md @@ -0,0 +1,51 @@ +# Persisted Sync Metadata Plan + +This directory breaks the `RFC-persisted-sync-metadata.md` design into an +implementation plan with explicit phases. + +The recommended execution order is: + +1. `01-core-api.md` +2. `02-sqlite-implementation.md` +3. `03-query-collection.md` +4. `04-electric-collection.md` +5. `05-test-plan.md` + +## Goals + +- land the core metadata transaction model first +- make SQLite the reference persistence implementation +- migrate `query-db-collection` onto the new primitives +- migrate `electric-db-collection` onto the new primitives +- validate correctness with thorough invariants-focused tests + +## Non-Goals + +- optimizing every replay and GC path in the first pass +- implementing every possible metadata-backed feature before the core API is + stable + +## Guiding principles + +- metadata that affects persisted row behavior must commit with the row state it + explains +- row metadata and collection metadata are distinct scopes +- metadata-only sync transactions are first-class +- restart correctness comes before targeted replay optimization +- persisted query retention is separate from in-memory `gcTime` + +## Phase dependencies + +- Phase 1 is required before any other phase +- Phase 2 depends on Phase 1 +- Phase 3 depends on Phases 1 and 2 +- Phase 4 depends on Phases 1 and 2 +- Phase 5 spans all phases and should be updated continuously + +## Recommended delivery strategy + +- implement Phase 1 and Phase 2 behind a narrow internal API +- land Phase 3 next because it is the primary motivator +- land Phase 4 once the core metadata model has proven stable under restart and + replay tests +- keep `05-test-plan.md` as the definition of done for each phase diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 66bae19d3..4198e463a 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -277,6 +277,12 @@ importers: examples/react/offline-transactions: dependencies: + '@tanstack/db': + specifier: workspace:* + version: link:../../../packages/db + '@tanstack/db-browser-wa-sqlite-persisted-collection': + specifier: workspace:* + version: link:../../../packages/db-browser-wa-sqlite-persisted-collection '@tanstack/offline-transactions': specifier: ^1.0.24 version: link:../../../packages/offline-transactions