diff --git a/.changeset/smooth-waves-live-sse.md b/.changeset/smooth-waves-live-sse.md new file mode 100644 index 000000000..2eed7a46f --- /dev/null +++ b/.changeset/smooth-waves-live-sse.md @@ -0,0 +1,5 @@ +--- +"@electric-sql/pglite-sync": patch +--- + +Allow `liveSse` in sync shape options and map it to Electric's current SSE option. diff --git a/packages/pglite-sync/src/index.ts b/packages/pglite-sync/src/index.ts index 2bc684f22..95b639f1a 100644 --- a/packages/pglite-sync/src/index.ts +++ b/packages/pglite-sync/src/index.ts @@ -29,6 +29,7 @@ import { applyMessagesToTableWithCopy, applyMessagesToTableWithJson, } from './apply' +import { normalizeShapeStreamOptions } from './shapeStreamOptions' export * from './types' @@ -154,10 +155,13 @@ async function createPlugin( shapes: Object.fromEntries( Object.entries(shapes).map(([key, shapeOptions]) => { const shapeMetadata = subState?.shape_metadata[key] + const shapeStreamOptions = normalizeShapeStreamOptions( + shapeOptions.shape, + ) return [ key, { - ...shapeOptions.shape, + ...shapeStreamOptions, ...(shapeMetadata ? { offset: shapeMetadata.offset, diff --git a/packages/pglite-sync/src/shapeStreamOptions.ts b/packages/pglite-sync/src/shapeStreamOptions.ts new file mode 100644 index 000000000..038d694d0 --- /dev/null +++ b/packages/pglite-sync/src/shapeStreamOptions.ts @@ -0,0 +1,23 @@ +import type { ShapeStreamOptions } from '@electric-sql/client' + +export type PGliteSyncShapeStreamOptions = ShapeStreamOptions & { + liveSse?: boolean +} + +export function normalizeShapeStreamOptions( + shape: PGliteSyncShapeStreamOptions, +): ShapeStreamOptions { + const { liveSse, ...shapeStreamOptions } = shape + + if ( + liveSse !== undefined && + shapeStreamOptions.experimentalLiveSse === undefined + ) { + return { + ...shapeStreamOptions, + experimentalLiveSse: liveSse, + } + } + + return shapeStreamOptions +} diff --git a/packages/pglite-sync/src/types.ts b/packages/pglite-sync/src/types.ts index 1e5e5fc47..887fb117b 100644 --- a/packages/pglite-sync/src/types.ts +++ b/packages/pglite-sync/src/types.ts @@ -1,11 +1,11 @@ import type { - ShapeStreamOptions, ShapeStreamInterface, Row, ChangeMessage, FetchError, } from '@electric-sql/client' import { Transaction } from '@electric-sql/pglite' +import type { PGliteSyncShapeStreamOptions } from './shapeStreamOptions' export type Lsn = bigint @@ -16,7 +16,7 @@ export type SubscriptionKey = string export type InitialInsertMethod = 'insert' | 'csv' | 'json' | 'useCopy' export interface ShapeToTableOptions { - shape: ShapeStreamOptions + shape: PGliteSyncShapeStreamOptions table: string schema?: string mapColumns?: MapColumns @@ -40,7 +40,7 @@ export interface SyncShapesToTablesResult { } export interface SyncShapeToTableOptions { - shape: ShapeStreamOptions + shape: PGliteSyncShapeStreamOptions table: string schema?: string mapColumns?: MapColumns diff --git a/packages/pglite-sync/test/shapeStreamOptions.test.ts b/packages/pglite-sync/test/shapeStreamOptions.test.ts new file mode 100644 index 000000000..7907794ec --- /dev/null +++ b/packages/pglite-sync/test/shapeStreamOptions.test.ts @@ -0,0 +1,35 @@ +import { describe, expect, it } from 'vitest' +import { + normalizeShapeStreamOptions, + type PGliteSyncShapeStreamOptions, +} from '../src/shapeStreamOptions' + +describe('normalizeShapeStreamOptions', () => { + it('maps liveSse to the Electric client SSE option', () => { + const shape: PGliteSyncShapeStreamOptions = { + url: 'http://localhost:3000/v1/shape', + liveSse: true, + } + + const normalized = normalizeShapeStreamOptions(shape) + + expect(normalized).toEqual({ + url: 'http://localhost:3000/v1/shape', + experimentalLiveSse: true, + }) + expect('liveSse' in normalized).toBe(false) + }) + + it('preserves explicit experimentalLiveSse when both options are provided', () => { + const normalized = normalizeShapeStreamOptions({ + url: 'http://localhost:3000/v1/shape', + liveSse: true, + experimentalLiveSse: false, + }) + + expect(normalized).toEqual({ + url: 'http://localhost:3000/v1/shape', + experimentalLiveSse: false, + }) + }) +})