Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/smooth-waves-live-sse.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@electric-sql/pglite-sync": patch
---

Allow `liveSse` in sync shape options and map it to Electric's current SSE option.
6 changes: 5 additions & 1 deletion packages/pglite-sync/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import {
applyMessagesToTableWithCopy,
applyMessagesToTableWithJson,
} from './apply'
import { normalizeShapeStreamOptions } from './shapeStreamOptions'

export * from './types'

Expand Down Expand Up @@ -154,10 +155,13 @@ async function createPlugin(
shapes: Object.fromEntries(
Object.entries(shapes).map(([key, shapeOptions]) => {
const shapeMetadata = subState?.shape_metadata[key]
const shapeStreamOptions = normalizeShapeStreamOptions(
shapeOptions.shape,
)
return [
key,
{
...shapeOptions.shape,
...shapeStreamOptions,
...(shapeMetadata
? {
offset: shapeMetadata.offset,
Expand Down
23 changes: 23 additions & 0 deletions packages/pglite-sync/src/shapeStreamOptions.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import type { ShapeStreamOptions } from '@electric-sql/client'

export type PGliteSyncShapeStreamOptions<T = never> = ShapeStreamOptions<T> & {
liveSse?: boolean
}

export function normalizeShapeStreamOptions<T = never>(
shape: PGliteSyncShapeStreamOptions<T>,
): ShapeStreamOptions<T> {
const { liveSse, ...shapeStreamOptions } = shape

if (
liveSse !== undefined &&
shapeStreamOptions.experimentalLiveSse === undefined
) {
return {
...shapeStreamOptions,
experimentalLiveSse: liveSse,
}
}

return shapeStreamOptions
}
6 changes: 3 additions & 3 deletions packages/pglite-sync/src/types.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import type {
ShapeStreamOptions,
ShapeStreamInterface,
Row,
ChangeMessage,
FetchError,
} from '@electric-sql/client'
import { Transaction } from '@electric-sql/pglite'
import type { PGliteSyncShapeStreamOptions } from './shapeStreamOptions'

export type Lsn = bigint

Expand All @@ -16,7 +16,7 @@ export type SubscriptionKey = string
export type InitialInsertMethod = 'insert' | 'csv' | 'json' | 'useCopy'

export interface ShapeToTableOptions {
shape: ShapeStreamOptions
shape: PGliteSyncShapeStreamOptions
table: string
schema?: string
mapColumns?: MapColumns
Expand All @@ -40,7 +40,7 @@ export interface SyncShapesToTablesResult {
}

export interface SyncShapeToTableOptions {
shape: ShapeStreamOptions
shape: PGliteSyncShapeStreamOptions
table: string
schema?: string
mapColumns?: MapColumns
Expand Down
35 changes: 35 additions & 0 deletions packages/pglite-sync/test/shapeStreamOptions.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import { describe, expect, it } from 'vitest'
import {
normalizeShapeStreamOptions,
type PGliteSyncShapeStreamOptions,
} from '../src/shapeStreamOptions'

describe('normalizeShapeStreamOptions', () => {
it('maps liveSse to the Electric client SSE option', () => {
const shape: PGliteSyncShapeStreamOptions = {
url: 'http://localhost:3000/v1/shape',
liveSse: true,
}

const normalized = normalizeShapeStreamOptions(shape)

expect(normalized).toEqual({
url: 'http://localhost:3000/v1/shape',
experimentalLiveSse: true,
})
expect('liveSse' in normalized).toBe(false)
})

it('preserves explicit experimentalLiveSse when both options are provided', () => {
const normalized = normalizeShapeStreamOptions({
url: 'http://localhost:3000/v1/shape',
liveSse: true,
experimentalLiveSse: false,
})

expect(normalized).toEqual({
url: 'http://localhost:3000/v1/shape',
experimentalLiveSse: false,
})
})
})