diff --git a/apps/dashboard/e2e/global-setup.ts b/apps/dashboard/e2e/global-setup.ts index 80d61dd00..60efc5cee 100644 --- a/apps/dashboard/e2e/global-setup.ts +++ b/apps/dashboard/e2e/global-setup.ts @@ -11,7 +11,7 @@ export default async function globalSetup(_config: FullConfig) { const resolver = createConnectorResolver({ sources: { stripe: sourceStripe }, - destinations: { postgres: destinationPostgres, 'google-sheets': destinationGoogleSheets }, + destinations: { postgres: destinationPostgres, 'google_sheets': destinationGoogleSheets }, }) const app = createApp(resolver) diff --git a/apps/dashboard/e2e/pipeline-create.test.ts b/apps/dashboard/e2e/pipeline-create.test.ts index f79456f66..4a4976d1a 100644 --- a/apps/dashboard/e2e/pipeline-create.test.ts +++ b/apps/dashboard/e2e/pipeline-create.test.ts @@ -46,7 +46,7 @@ test('full pipeline creation flow: source → discover → streams → destinati // Click next to destination await page.getByRole('button', { name: /next.*configure destination/i }).click() - // Step 3: Destination — should show postgres and google-sheets + // Step 3: Destination — should show postgres and google_sheets await expect(page.locator('select').first()).toBeVisible() await page.locator('select').first().selectOption('postgres') diff --git a/apps/engine/src/__generated__/openapi.d.ts b/apps/engine/src/__generated__/openapi.d.ts index 35b8b278b..cad022c2f 100644 --- a/apps/engine/src/__generated__/openapi.d.ts +++ b/apps/engine/src/__generated__/openapi.d.ts @@ -580,8 +580,8 @@ export interface components { postgres: components["schemas"]["DestinationPostgresConfig"]; } | { /** @constant */ - type: "google-sheets"; - "google-sheets": components["schemas"]["DestinationGoogleSheetsConfig"]; + type: "google_sheets"; + google_sheets: components["schemas"]["DestinationGoogleSheetsConfig"]; }; DestinationPostgresConfig: { /** @description Postgres connection string (alias for connection_string) */ diff --git a/apps/engine/src/__generated__/openapi.json b/apps/engine/src/__generated__/openapi.json index 2498fbb25..1544d5821 100644 --- a/apps/engine/src/__generated__/openapi.json +++ b/apps/engine/src/__generated__/openapi.json @@ -1630,15 +1630,15 @@ "properties": { "type": { "type": "string", - "const": "google-sheets" + "const": "google_sheets" }, - "google-sheets": { + "google_sheets": { "$ref": "#/components/schemas/DestinationGoogleSheetsConfig" } }, "required": [ "type", - "google-sheets" + "google_sheets" ] } ], diff --git a/apps/engine/src/api/index.ts b/apps/engine/src/api/index.ts index 38d291282..3901a1daa 100755 --- a/apps/engine/src/api/index.ts +++ b/apps/engine/src/api/index.ts @@ -13,7 +13,7 @@ const port = Number(process.env.PORT || 3001) async function main() { const resolver = await createConnectorResolver({ sources: { stripe: source }, - destinations: { postgres: pgDestination, 'google-sheets': sheetsDestination }, + destinations: { postgres: pgDestination, 'google_sheets': sheetsDestination }, }) const app = await createApp(resolver) diff --git a/apps/engine/src/lib/default-connectors.ts b/apps/engine/src/lib/default-connectors.ts index bf0d32d0f..e91a805db 100644 --- a/apps/engine/src/lib/default-connectors.ts +++ b/apps/engine/src/lib/default-connectors.ts @@ -8,6 +8,6 @@ export const defaultConnectors: RegisteredConnectors = { sources: { stripe: sourceStripe }, destinations: { postgres: destinationPostgres, - 'google-sheets': destinationGoogleSheets, + 'google_sheets': destinationGoogleSheets, }, } diff --git a/apps/service/src/__generated__/openapi.d.ts b/apps/service/src/__generated__/openapi.d.ts index 81726fa2e..c99fdb8c4 100644 --- a/apps/service/src/__generated__/openapi.d.ts +++ b/apps/service/src/__generated__/openapi.d.ts @@ -129,8 +129,8 @@ export interface components { postgres: components["schemas"]["DestinationPostgresConfig"]; } | { /** @constant */ - type: "google-sheets"; - "google-sheets": components["schemas"]["DestinationGoogleSheetsConfig"]; + type: "google_sheets"; + google_sheets: components["schemas"]["DestinationGoogleSheetsConfig"]; }; DestinationPostgresConfig: { /** @description Postgres connection string (alias for connection_string) */ diff --git a/apps/service/src/__generated__/openapi.json b/apps/service/src/__generated__/openapi.json index 50ab9ef34..0d292d7c9 100644 --- a/apps/service/src/__generated__/openapi.json +++ b/apps/service/src/__generated__/openapi.json @@ -918,15 +918,15 @@ "properties": { "type": { "type": "string", - "const": "google-sheets" + "const": "google_sheets" }, - "google-sheets": { + "google_sheets": { "$ref": "#/components/schemas/DestinationGoogleSheetsConfig" } }, "required": [ "type", - "google-sheets" + "google_sheets" ] } ], diff --git a/apps/service/src/__tests__/workflow.test.ts b/apps/service/src/__tests__/workflow.test.ts index e71ab306c..3b67f449c 100644 --- a/apps/service/src/__tests__/workflow.test.ts +++ b/apps/service/src/__tests__/workflow.test.ts @@ -22,12 +22,12 @@ function stubActivities(overrides: Partial = {}): SyncActivities discoverCatalog: async () => ({ streams: [] }), pipelineSetup: async () => ({}), pipelineSync: async () => noErrors, - readGoogleSheetsIntoQueue: async () => ({ count: 0, state: emptyState }), + readIntoQueue: async () => ({ count: 0, state: emptyState }), writeGoogleSheetsFromQueue: async () => ({ errors: [], state: emptyState, written: 0, - rowAssignments: {}, + rowIndexMap: {}, }), pipelineTeardown: async () => {}, updatePipelineStatus: async () => {}, @@ -310,9 +310,7 @@ describe('pipelineWorkflow (unit — stubbed activities)', () => { if (opts?.input) return noErrors reconcileCalls++ - return reconcileCalls === 1 - ? { ...noErrors, eof: { reason: 'complete' } } - : noErrors + return reconcileCalls === 1 ? { ...noErrors, eof: { reason: 'complete' } } : noErrors }, }), }) @@ -330,7 +328,9 @@ describe('pipelineWorkflow (unit — stubbed activities)', () => { await handle.signal('desired_status', 'deleted') await handle.result() - expect(statusWrites).toEqual(expect.arrayContaining(['backfill', 'ready', 'paused', 'teardown'])) + expect(statusWrites).toEqual( + expect.arrayContaining(['backfill', 'ready', 'paused', 'teardown']) + ) }) }) @@ -507,7 +507,7 @@ describe('googleSheetPipelineWorkflow (unit — stubbed activities)', () => { discoverCalls++ return { streams: [] } }, - readGoogleSheetsIntoQueue: async () => { + readIntoQueue: async () => { readCalls++ return { count: 0, state: emptyState } }, @@ -563,7 +563,7 @@ describe('googleSheetPipelineWorkflow (unit — stubbed activities)', () => { workflowsPath, activities: stubActivities({ discoverCatalog: async () => discoveredCatalog, - readGoogleSheetsIntoQueue: async () => { + readIntoQueue: async () => { readCalls++ return readCalls === 1 ? { count: 1, state: { streams: { customers: { cursor: 'cus_1' } }, global: {} } } @@ -575,7 +575,7 @@ describe('googleSheetPipelineWorkflow (unit — stubbed activities)', () => { errors: [], state: { streams: { customers: { cursor: 'cus_1' } }, global: {} }, written: 0, - rowAssignments: {}, + rowIndexMap: {}, } }, }), diff --git a/apps/service/src/api/app.test.ts b/apps/service/src/api/app.test.ts index 61a943b46..4562d35b8 100644 --- a/apps/service/src/api/app.test.ts +++ b/apps/service/src/api/app.test.ts @@ -20,7 +20,7 @@ let resolver: ConnectorResolver beforeAll(async () => { resolver = await createConnectorResolver({ sources: { test: sourceTest }, - destinations: { test: destinationTest, 'google-sheets': destinationGoogleSheets }, + destinations: { test: destinationTest, 'google_sheets': destinationGoogleSheets }, }) }) @@ -80,7 +80,7 @@ describe('GET /health', () => { }) describe('POST /pipelines workflow dispatch', () => { - it('starts google-sheets pipelines on the dedicated workflow', async () => { + it('starts google_sheets pipelines on the dedicated workflow', async () => { const start = vi.fn(async () => ({})) const res = await createApp({ temporal: { client: { start } as unknown as WorkflowClient, taskQueue: 'unused' }, @@ -92,8 +92,8 @@ describe('POST /pipelines workflow dispatch', () => { body: JSON.stringify({ source: { type: 'test', test: {} }, destination: { - type: 'google-sheets', - 'google-sheets': { + type: 'google_sheets', + 'google_sheets': { spreadsheet_id: 'sheet_123', spreadsheet_title: 'Test Sheet', client_id: 'client', @@ -133,12 +133,12 @@ function stubActivities(): SyncActivities { discoverCatalog: async () => ({ streams: [] }), pipelineSetup: async () => ({}), pipelineSync: async () => noErrors, - readGoogleSheetsIntoQueue: async () => ({ count: 0, state: emptyState }), + readIntoQueue: async () => ({ count: 0, state: emptyState }), writeGoogleSheetsFromQueue: async () => ({ errors: [], state: emptyState, written: 0, - rowAssignments: {}, + rowIndexMap: {}, }), pipelineTeardown: async () => {}, updatePipelineStatus: async () => {}, @@ -240,7 +240,7 @@ describe('pipeline CRUD', () => { await a.request(`/pipelines/${created.id}`, { method: 'DELETE' }) }) - it('rejects changing the target spreadsheet for a google-sheets pipeline', async () => { + it('rejects changing the target spreadsheet for a google_sheets pipeline', async () => { const a = liveApp() const createRes = await a.request('/pipelines', { @@ -249,8 +249,8 @@ describe('pipeline CRUD', () => { body: JSON.stringify({ source: { type: 'test', test: {} }, destination: { - type: 'google-sheets', - 'google-sheets': { + type: 'google_sheets', + 'google_sheets': { spreadsheet_id: 'sheet_123', spreadsheet_title: 'Original Sheet', client_id: 'client', @@ -269,8 +269,8 @@ describe('pipeline CRUD', () => { headers: { 'content-type': 'application/json' }, body: JSON.stringify({ destination: { - type: 'google-sheets', - 'google-sheets': { + type: 'google_sheets', + 'google_sheets': { spreadsheet_id: 'sheet_456', spreadsheet_title: 'Replacement Sheet', client_id: 'client', @@ -285,7 +285,7 @@ describe('pipeline CRUD', () => { expect(updateRes.status).toBe(400) expect(await updateRes.json()).toEqual({ error: - 'Changing the target spreadsheet for a google-sheets pipeline requires recreating the pipeline', + 'Changing the target spreadsheet for a google_sheets pipeline requires recreating the pipeline', }) await a.request(`/pipelines/${created.id}`, { method: 'DELETE' }) @@ -300,8 +300,8 @@ describe('pipeline CRUD', () => { body: JSON.stringify({ source: { type: 'test', test: {} }, destination: { - type: 'google-sheets', - 'google-sheets': { + type: 'google_sheets', + 'google_sheets': { spreadsheet_id: 'sheet_123', spreadsheet_title: 'Original Sheet', client_id: 'client', @@ -320,8 +320,8 @@ describe('pipeline CRUD', () => { headers: { 'content-type': 'application/json' }, body: JSON.stringify({ destination: { - type: 'google-sheets', - 'google-sheets': { + type: 'google_sheets', + 'google_sheets': { spreadsheet_id: 'sheet_123', spreadsheet_title: 'Renamed Sheet', client_id: 'client', @@ -335,8 +335,8 @@ describe('pipeline CRUD', () => { expect(updateRes.status).toBe(200) const updated = await updateRes.json() - expect(updated.destination['google-sheets'].spreadsheet_id).toBe('sheet_123') - expect(updated.destination['google-sheets'].spreadsheet_title).toBe('Renamed Sheet') + expect(updated.destination['google_sheets'].spreadsheet_id).toBe('sheet_123') + expect(updated.destination['google_sheets'].spreadsheet_title).toBe('Renamed Sheet') await a.request(`/pipelines/${created.id}`, { method: 'DELETE' }) }) diff --git a/apps/service/src/api/app.ts b/apps/service/src/api/app.ts index 2a66f87aa..d422e97d2 100644 --- a/apps/service/src/api/app.ts +++ b/apps/service/src/api/app.ts @@ -13,7 +13,7 @@ const DEFAULT_PIPELINE_WORKFLOW = 'pipelineWorkflow' const GOOGLE_SHEETS_PIPELINE_WORKFLOW = 'googleSheetPipelineWorkflow' function workflowTypeForPipeline(pipeline: Pipeline): string { - return pipeline.destination.type === 'google-sheets' + return pipeline.destination.type === 'google_sheets' ? GOOGLE_SHEETS_PIPELINE_WORKFLOW : DEFAULT_PIPELINE_WORKFLOW } @@ -232,7 +232,7 @@ export function createApp(options: AppOptions) { } } - // Validate google-sheets constraints + // Validate google_sheets constraints const next = { ...current, ...(patch.source ? { source: patch.source } : {}), @@ -243,22 +243,22 @@ export function createApp(options: AppOptions) { return c.json( { error: - 'Changing destination.type between google-sheets and non-google-sheets requires recreating the pipeline', + 'Changing destination.type between google_sheets and non-google_sheets requires recreating the pipeline', }, 400 ) } if ( - current.destination.type === 'google-sheets' && + current.destination.type === 'google_sheets' && // eslint-disable-next-line @typescript-eslint/no-explicit-any - (current.destination as any)['google-sheets']?.spreadsheet_id !== + (current.destination as any)['google_sheets']?.spreadsheet_id !== // eslint-disable-next-line @typescript-eslint/no-explicit-any - (next.destination as any)['google-sheets']?.spreadsheet_id + (next.destination as any)['google_sheets']?.spreadsheet_id ) { return c.json( { error: - 'Changing the target spreadsheet for a google-sheets pipeline requires recreating the pipeline', + 'Changing the target spreadsheet for a google_sheets pipeline requires recreating the pipeline', }, 400 ) diff --git a/apps/service/src/cli.ts b/apps/service/src/cli.ts index b66297c39..374534be0 100644 --- a/apps/service/src/cli.ts +++ b/apps/service/src/cli.ts @@ -9,11 +9,14 @@ import destinationGoogleSheets from '@stripe/sync-destination-google-sheets' import { createApp } from './api/app.js' import { filePipelineStore } from './lib/stores-fs.js' import type { WorkflowClient } from '@temporalio/client' +import { homedir } from 'node:os' import { logger } from './logger.js' +const defaultDataDir = process.env.DATA_DIR ?? `${homedir()}/.stripe-sync` + const resolverPromise = createConnectorResolver({ sources: { stripe: sourceStripe }, - destinations: { postgres: destinationPostgres, 'google-sheets': destinationGoogleSheets }, + destinations: { postgres: destinationPostgres, 'google_sheets': destinationGoogleSheets }, }) async function createTemporalClient( @@ -47,8 +50,8 @@ const serveCmd = defineCommand({ }, 'data-dir': { type: 'string', - required: true, - description: 'Directory to persist pipeline configs as JSON files.', + default: defaultDataDir, + description: `Directory to persist pipeline configs as JSON files (default: ${defaultDataDir}).`, }, }, async run({ args }) { @@ -108,8 +111,8 @@ const workerCmd = defineCommand({ }, 'data-dir': { type: 'string', - required: true, - description: 'Directory to persist pipeline configs as JSON files.', + default: defaultDataDir, + description: `Directory to persist pipeline configs as JSON files (default: ${defaultDataDir}).`, }, }, async run({ args }) { @@ -170,8 +173,8 @@ const webhookCmd = defineCommand({ }, 'data-dir': { type: 'string', - required: true, - description: 'Directory to persist pipeline configs as JSON files.', + default: defaultDataDir, + description: `Directory to persist pipeline configs as JSON files (default: ${defaultDataDir}).`, }, }, async run({ args }) { diff --git a/apps/service/src/temporal/activities/_shared.ts b/apps/service/src/temporal/activities/_shared.ts index b0ba710e4..022a2ef9a 100644 --- a/apps/service/src/temporal/activities/_shared.ts +++ b/apps/service/src/temporal/activities/_shared.ts @@ -1,5 +1,5 @@ import { heartbeat } from '@temporalio/activity' -import type { Message, Engine, SourceState } from '@stripe/sync-engine' +import type { Message, Engine, SourceState, SourceStateMessage } from '@stripe/sync-engine' import { createRemoteEngine } from '@stripe/sync-engine' import { Kafka } from 'kafkajs' import type { Producer } from 'kafkajs' @@ -116,6 +116,16 @@ export function pipelineHeader(config: Record): string { return JSON.stringify(config) } +export function mergeStateMessage(state: SourceState, msg: SourceStateMessage): SourceState { + if (msg.source_state.state_type === 'global') { + return { ...state, global: msg.source_state.data as Record } + } + return { + ...state, + streams: { ...state.streams, [msg.source_state.stream]: msg.source_state.data }, + } +} + export function collectError(message: Message): RunResult['errors'][number] | null { if (message.type === 'trace' && message.trace.trace_type === 'error') { return { @@ -139,10 +149,7 @@ export async function drainMessages( eof?: { reason: string } }> { const errors: RunResult['errors'] = [] - const state: SourceState = { - streams: { ...initialState?.streams }, - global: { ...initialState?.global }, - } + let state: SourceState = initialState ?? { streams: {}, global: {} } const records: Message[] = [] let sourceConfig: Record | undefined let destConfig: Record | undefined @@ -164,11 +171,7 @@ export async function drainMessages( if (error) { errors.push(error) } else if (message.type === 'source_state') { - if (message.source_state.state_type === 'global') { - state.global = message.source_state.data as Record - } else { - state.streams[message.source_state.stream] = message.source_state.data - } + state = mergeStateMessage(state, message) } else if (message.type === 'record') { records.push(message) } diff --git a/apps/service/src/temporal/activities/index.ts b/apps/service/src/temporal/activities/index.ts index 2566b9c6f..36c2df491 100644 --- a/apps/service/src/temporal/activities/index.ts +++ b/apps/service/src/temporal/activities/index.ts @@ -1,7 +1,7 @@ import { createActivitiesContext } from './_shared.js' import { createUpdatePipelineStatusActivity } from './update-pipeline-status.js' import { createDiscoverCatalogActivity } from './discover-catalog.js' -import { createReadGoogleSheetsIntoQueueActivity } from './read-google-sheets-into-queue.js' +import { createReadIntoQueueActivity } from './read-into-queue.js' import { createPipelineSetupActivity } from './pipeline-setup.js' import { createPipelineSyncActivity } from './pipeline-sync.js' import { createPipelineTeardownActivity } from './pipeline-teardown.js' @@ -21,7 +21,7 @@ export function createActivities(opts: { discoverCatalog: createDiscoverCatalogActivity(context), pipelineSetup: createPipelineSetupActivity(context), pipelineSync: createPipelineSyncActivity(context), - readGoogleSheetsIntoQueue: createReadGoogleSheetsIntoQueueActivity(context), + readIntoQueue: createReadIntoQueueActivity(context), writeGoogleSheetsFromQueue: createWriteGoogleSheetsFromQueueActivity(context), pipelineTeardown: createPipelineTeardownActivity(context), updatePipelineStatus: createUpdatePipelineStatusActivity(context), diff --git a/apps/service/src/temporal/activities/read-google-sheets-into-queue.ts b/apps/service/src/temporal/activities/read-google-sheets-into-queue.ts deleted file mode 100644 index bd6bdf88b..000000000 --- a/apps/service/src/temporal/activities/read-google-sheets-into-queue.ts +++ /dev/null @@ -1,109 +0,0 @@ -import { heartbeat } from '@temporalio/activity' -import type { - ConfiguredCatalog, - Message, - RecordMessage, - SourceInputMessage, - SourceReadOptions, -} from '@stripe/sync-engine' -import { - ROW_KEY_FIELD, - ROW_NUMBER_FIELD, - serializeRowKey, -} from '@stripe/sync-destination-google-sheets' - -import type { ActivitiesContext } from './_shared.js' -import { asIterable, collectError, type RunResult } from './_shared.js' -type RowIndex = Record> - -function withRowKey(record: RecordMessage, catalog?: ConfiguredCatalog): RecordMessage { - const primaryKey = catalog?.streams.find((stream) => stream.stream.name === record.record.stream) - ?.stream.primary_key - if (!primaryKey) return record - return { - ...record, - record: { - ...record.record, - data: { - ...record.record.data, - [ROW_KEY_FIELD]: serializeRowKey(primaryKey, record.record.data), - }, - }, - } -} - -function withRowNumber(record: RecordMessage, rowIndex: RowIndex): RecordMessage { - const rowKey = - typeof record.record.data[ROW_KEY_FIELD] === 'string' - ? record.record.data[ROW_KEY_FIELD] - : undefined - const rowNumber = rowKey ? rowIndex[record.record.stream]?.[rowKey] : undefined - if (rowNumber === undefined) return record - return { - ...record, - record: { - ...record.record, - data: { ...record.record.data, [ROW_NUMBER_FIELD]: rowNumber }, - }, - } -} - -export function createReadGoogleSheetsIntoQueueActivity(context: ActivitiesContext) { - return async function readGoogleSheetsIntoQueue( - pipelineId: string, - opts?: SourceReadOptions & { - input?: SourceInputMessage[] - catalog?: ConfiguredCatalog - rowIndex?: RowIndex - } - ): Promise<{ count: number; state: import('@stripe/sync-engine').SourceState }> { - if (!context.kafkaBroker) throw new Error('kafkaBroker is required for Google Sheets workflow') - - const pipeline = await context.pipelineStore.get(pipelineId) - const { id: _, ...config } = pipeline - const { input: inputArr, catalog, rowIndex, ...readOpts } = opts ?? {} - const input = inputArr?.length ? asIterable(inputArr) : undefined - - const queued: Message[] = [] - const state: import('@stripe/sync-engine').SourceState = { - streams: { ...readOpts.state?.streams }, - global: { ...readOpts.state?.global }, - } - const errors: RunResult['errors'] = [] - let seen = 0 - - for await (const raw of context.engine.pipeline_read(config, readOpts, input)) { - seen++ - const error = collectError(raw) - if (error) { - errors.push(error) - } else if (raw.type === 'record') { - const withKey = withRowKey(raw, catalog) - queued.push(rowIndex ? withRowNumber(withKey, rowIndex) : withKey) - } else if (raw.type === 'source_state') { - if (raw.source_state.state_type === 'global') { - state.global = raw.source_state.data as Record - } else { - state.streams[raw.source_state.stream] = raw.source_state.data - } - queued.push(raw) - } - if (seen % 50 === 0) heartbeat({ messages: seen }) - } - if (seen % 50 !== 0) heartbeat({ messages: seen }) - - if (errors.length > 0) { - throw new Error(errors.map((error) => error.message).join('; ')) - } - - if (queued.length > 0) { - const producer = await context.getProducer() - await producer.send({ - topic: `pipeline.${pipelineId}`, - messages: queued.map((message) => ({ value: JSON.stringify(message) })), - }) - } - - return { count: queued.length, state } - } -} diff --git a/apps/service/src/temporal/activities/read-into-queue.ts b/apps/service/src/temporal/activities/read-into-queue.ts new file mode 100644 index 000000000..947315fe1 --- /dev/null +++ b/apps/service/src/temporal/activities/read-into-queue.ts @@ -0,0 +1,65 @@ +import { heartbeat } from '@temporalio/activity' +import type { Message, SourceInputMessage, SourceReadOptions } from '@stripe/sync-engine' + +import type { ActivitiesContext } from './_shared.js' +import { mergeStateMessage, asIterable, collectError, type RunResult } from './_shared.js' + +export function createReadIntoQueueActivity(context: ActivitiesContext) { + return async function readIntoQueue( + pipelineId: string, + opts?: SourceReadOptions & { input?: SourceInputMessage[] } + ): Promise<{ + count: number + state: import('@stripe/sync-engine').SourceState + eof?: { reason: string } + }> { + if (!context.kafkaBroker) throw new Error('kafkaBroker is required for Google Sheets workflow') + + const pipeline = await context.pipelineStore.get(pipelineId) + const { id: _, ...config } = pipeline + const { input: inputArr, ...readOpts } = opts ?? {} + const input = inputArr?.length ? asIterable(inputArr) : undefined + + const queued: Message[] = [] + let state: import('@stripe/sync-engine').SourceState = readOpts.state ?? { + streams: {}, + global: {}, + } + const errors: RunResult['errors'] = [] + let eof: { reason: string } | undefined + let seen = 0 + + for await (const raw of context.engine.pipeline_read(config, readOpts, input)) { + seen++ + if (raw.type === 'eof') { + eof = { reason: raw.eof.reason } + } else { + const error = collectError(raw) + if (error) { + errors.push(error) + } else if (raw.type === 'record') { + queued.push(raw) + } else if (raw.type === 'source_state') { + state = mergeStateMessage(state, raw) + queued.push(raw) + } + } + if (seen % 50 === 0) heartbeat({ messages: seen }) + } + if (seen % 50 !== 0) heartbeat({ messages: seen }) + + if (errors.length > 0) { + throw new Error(errors.map((error) => error.message).join('; ')) + } + + if (queued.length > 0) { + const producer = await context.getProducer() + await producer.send({ + topic: `pipeline.${pipelineId}`, + messages: queued.map((message) => ({ value: JSON.stringify(message) })), + }) + } + + return { count: queued.length, state, eof } + } +} diff --git a/apps/service/src/temporal/activities/write-google-sheets-from-queue.ts b/apps/service/src/temporal/activities/write-google-sheets-from-queue.ts index b3df412fc..b5811e1ec 100644 --- a/apps/service/src/temporal/activities/write-google-sheets-from-queue.ts +++ b/apps/service/src/temporal/activities/write-google-sheets-from-queue.ts @@ -11,10 +11,45 @@ import { parseGoogleSheetsMetaLog, ROW_KEY_FIELD, ROW_NUMBER_FIELD, + serializeRowKey, } from '@stripe/sync-destination-google-sheets' import type { ActivitiesContext } from './_shared.js' -import { asIterable, collectError, type RunResult } from './_shared.js' +import { asIterable, collectError, mergeStateMessage, type RunResult } from './_shared.js' + +type RowIndexMap = Record> + +function withRowKey(record: RecordMessage, catalog?: ConfiguredCatalog): RecordMessage { + const primaryKey = catalog?.streams.find((stream) => stream.stream.name === record.record.stream) + ?.stream.primary_key + if (!primaryKey) return record + return { + ...record, + record: { + ...record.record, + data: { + ...record.record.data, + [ROW_KEY_FIELD]: serializeRowKey(primaryKey, record.record.data), + }, + }, + } +} + +function withRowNumber(record: RecordMessage, rowIndexMap: RowIndexMap): RecordMessage { + const rowKey = + typeof record.record.data[ROW_KEY_FIELD] === 'string' + ? record.record.data[ROW_KEY_FIELD] + : undefined + const rowNumber = rowKey ? rowIndexMap[record.record.stream]?.[rowKey] : undefined + if (rowNumber === undefined) return record + return { + ...record, + record: { + ...record.record, + data: { ...record.record.data, [ROW_NUMBER_FIELD]: rowNumber }, + }, + } +} function compactGoogleSheetsMessages(messages: Message[]): Message[] { const compacted: Message[] = [] @@ -89,12 +124,13 @@ export function createWriteGoogleSheetsFromQueueActivity(context: ActivitiesCont opts?: { maxBatch?: number catalog?: ConfiguredCatalog - state?: import('@stripe/sync-engine').SourceState + rowIndexMap?: RowIndexMap + sourceState?: import('@stripe/sync-engine').SourceState } ): Promise< RunResult & { written: number - rowAssignments: Record> + rowIndexMap: Record> } > { if (!context.kafkaBroker) throw new Error('kafkaBroker is required for Google Sheets workflow') @@ -102,20 +138,25 @@ export function createWriteGoogleSheetsFromQueueActivity(context: ActivitiesCont const maxBatch = opts?.maxBatch ?? 50 const queued = await context.consumeQueueBatch(pipelineId, maxBatch) - const initialState: import('@stripe/sync-engine').SourceState = { - streams: { ...opts?.state?.streams }, - global: { ...opts?.state?.global }, + let sourceState: import('@stripe/sync-engine').SourceState = opts?.sourceState ?? { + streams: {}, + global: {}, } if (queued.length === 0) { - return { errors: [], state: initialState, written: 0, rowAssignments: {} } + return { errors: [], state: sourceState, written: 0, rowIndexMap: {} } } const pipeline = await context.pipelineStore.get(pipelineId) const { id: _, ...config } = pipeline - const writeBatch = compactGoogleSheetsMessages(queued) - if (config.destination.type !== 'google-sheets') { - throw new Error('writeGoogleSheetsFromQueue requires a google-sheets destination') + const augmented = queued.map((message) => { + if (message.type !== 'record') return message + const keyed = withRowKey(message, opts?.catalog) + return opts?.rowIndexMap ? withRowNumber(keyed, opts.rowIndexMap) : keyed + }) + const writeBatch = compactGoogleSheetsMessages(augmented) + if (config.destination.type !== 'google_sheets') { + throw new Error('writeGoogleSheetsFromQueue requires a google_sheets destination') } if (!opts?.catalog) { throw new Error('catalog is required for Google Sheets workflow writes') @@ -125,11 +166,7 @@ export function createWriteGoogleSheetsFromQueueActivity(context: ActivitiesCont const filteredCatalog = augmentGoogleSheetsCatalog(opts.catalog) const destination = createGoogleSheetsDestination() const errors: RunResult['errors'] = [] - const state: import('@stripe/sync-engine').SourceState = { - streams: { ...initialState.streams }, - global: { ...initialState.global }, - } - const rowAssignments: Record> = {} + const rowIndexMap: Record> = {} const input = enforceCatalog(filteredCatalog)( asIterable(writeBatch) ) as AsyncIterable @@ -145,22 +182,18 @@ export function createWriteGoogleSheetsFromQueueActivity(context: ActivitiesCont if (error) { errors.push(error) } else if (raw.type === 'source_state') { - if (raw.source_state.state_type === 'global') { - state.global = raw.source_state.data as Record - } else { - state.streams[raw.source_state.stream] = raw.source_state.data - } + sourceState = mergeStateMessage(sourceState, raw) } else if (raw.type === 'log') { const meta = parseGoogleSheetsMetaLog(raw.log.message) if (meta?.type === 'row_assignments') { for (const [stream, assignments] of Object.entries(meta.assignments)) { - rowAssignments[stream] ??= {} - Object.assign(rowAssignments[stream], assignments) + rowIndexMap[stream] ??= {} + Object.assign(rowIndexMap[stream], assignments) } } } } - return { errors, state, written: queued.length, rowAssignments } + return { errors, state: sourceState, written: queued.length, rowIndexMap } } } diff --git a/apps/service/src/temporal/workflows/_shared.ts b/apps/service/src/temporal/workflows/_shared.ts index 80def6bb2..b33094750 100644 --- a/apps/service/src/temporal/workflows/_shared.ts +++ b/apps/service/src/temporal/workflows/_shared.ts @@ -5,7 +5,7 @@ import { retryPolicy } from '../../lib/utils.js' import { DesiredStatus } from '../../lib/createSchemas.js' import { SourceInputMessage } from '@stripe/sync-protocol' -export type RowIndex = Record> +export type RowIndexMap = Record> export const sourceInputSignal = defineSignal<[SourceInputMessage]>('source_input') /** Carries the new desired_status value — workflow updates its local state directly. */ @@ -22,7 +22,7 @@ export const { pipelineSync } = proxyActivities({ retry: retryPolicy, }) -export const { discoverCatalog, readGoogleSheetsIntoQueue, writeGoogleSheetsFromQueue } = +export const { discoverCatalog, readIntoQueue, writeGoogleSheetsFromQueue } = proxyActivities({ startToCloseTimeout: '10m', heartbeatTimeout: '2m', diff --git a/apps/service/src/temporal/workflows/backfill-pipeline-workflow.ts b/apps/service/src/temporal/workflows/backfill-pipeline-workflow.ts deleted file mode 100644 index cbfa0ea8a..000000000 --- a/apps/service/src/temporal/workflows/backfill-pipeline-workflow.ts +++ /dev/null @@ -1,66 +0,0 @@ -import { condition, continueAsNew, setHandler } from '@temporalio/workflow' - -import { desiredStatusSignal, pipelineSync, updatePipelineStatus } from './_shared.js' -import type { SourceState as SyncState } from '@stripe/sync-protocol' -import { CONTINUE_AS_NEW_THRESHOLD } from '../../lib/utils.js' - -const ONE_WEEK_MS = 7 * 24 * 60 * 60 * 1000 - -export interface BackfillPipelineWorkflowOpts { - desiredStatus?: string - state?: SyncState -} - -export async function backfillPipelineWorkflow( - pipelineId: string, - opts?: BackfillPipelineWorkflowOpts -): Promise { - let desiredStatus = opts?.desiredStatus ?? 'active' - let iteration = 0 - let syncState: SyncState = opts?.state ?? { streams: {}, global: {} } - let backfillComplete = false - - setHandler(desiredStatusSignal, (status: string) => { - desiredStatus = status - }) - - async function maybeContinueAsNew() { - if (++iteration >= CONTINUE_AS_NEW_THRESHOLD) { - await continueAsNew(pipelineId, { - desiredStatus, - state: syncState, - }) - } - } - - await updatePipelineStatus(pipelineId, 'backfill') - - while (desiredStatus !== 'deleted') { - if (desiredStatus === 'paused') { - await updatePipelineStatus(pipelineId, 'paused') - await condition(() => desiredStatus !== 'paused') - continue - } - - if (backfillComplete) { - await updatePipelineStatus(pipelineId, 'ready') - const timedOut = !(await condition(() => desiredStatus !== 'active', ONE_WEEK_MS)) - if (timedOut) backfillComplete = false - continue - } - - const result = await pipelineSync(pipelineId, { - state: syncState, - state_limit: 100, - time_limit: 10, - }) - syncState = { - streams: { ...syncState.streams, ...result.state.streams }, - global: { ...syncState.global, ...result.state.global }, - } - backfillComplete = result.eof?.reason === 'complete' - await maybeContinueAsNew() - } - - await updatePipelineStatus(pipelineId, 'teardown') -} diff --git a/apps/service/src/temporal/workflows/google-sheet-pipeline-workflow.ts b/apps/service/src/temporal/workflows/google-sheet-pipeline-workflow.ts index eb5a81335..645ea29fd 100644 --- a/apps/service/src/temporal/workflows/google-sheet-pipeline-workflow.ts +++ b/apps/service/src/temporal/workflows/google-sheet-pipeline-workflow.ts @@ -1,33 +1,41 @@ import { condition, continueAsNew, setHandler, sleep } from '@temporalio/workflow' -import type { - ConfiguredCatalog, - SourceInputMessage, - SourceState as SyncState, -} from '@stripe/sync-engine' - +import type { ConfiguredCatalog, SourceInputMessage, SourceState } from '@stripe/sync-protocol' +import type { DesiredStatus, PipelineStatus } from '../../lib/createSchemas.js' +import { CONTINUE_AS_NEW_THRESHOLD, EVENT_BATCH_SIZE } from '../../lib/utils.js' import { desiredStatusSignal, discoverCatalog, pipelineSetup, pipelineTeardown, - readGoogleSheetsIntoQueue, - RowIndex, + readIntoQueue, + RowIndexMap, sourceInputSignal, updatePipelineStatus, writeGoogleSheetsFromQueue, } from './_shared.js' -import { CONTINUE_AS_NEW_THRESHOLD, deepEqual, EVENT_BATCH_SIZE } from '../../lib/utils.js' + +const ONE_WEEK_MS = 7 * 24 * 60 * 60 * 1000 + +export type ReconcilePhase = 'backfilling' | 'reconciling' | 'ready' +export type SetupState = 'started' | 'completed' +export type TeardownState = 'started' | 'completed' + +export interface GoogleSheetWorkflowState { + phase?: ReconcilePhase + paused?: boolean + setup?: SetupState + teardown?: TeardownState + pendingWrites?: boolean +} export interface GoogleSheetPipelineWorkflowOpts { - desiredStatus?: string - setupDone?: boolean - state?: SyncState - readState?: SyncState - rowIndex?: RowIndex + desiredStatus?: DesiredStatus + syncState?: SourceState + readState?: SourceState + rowIndexMap?: RowIndexMap catalog?: ConfiguredCatalog - pendingWrites?: boolean inputQueue?: SourceInputMessage[] - readComplete?: boolean + state?: GoogleSheetWorkflowState writeRps?: number } @@ -35,140 +43,167 @@ export async function googleSheetPipelineWorkflow( pipelineId: string, opts?: GoogleSheetPipelineWorkflowOpts ): Promise { - let desiredStatus = opts?.desiredStatus ?? 'active' - const inputQueue: SourceInputMessage[] = [...(opts?.inputQueue ?? [])] - let iteration = 0 - let setupDone = opts?.setupDone ?? false - let syncState: SyncState = opts?.state ?? { streams: {}, global: {} } - let readState: SyncState = opts?.readState ?? { - streams: { ...syncState.streams }, - global: { ...syncState.global }, - } - let rowIndex: RowIndex = opts?.rowIndex ?? {} + // Persisted through continue-as-new. + const inputQueue: SourceInputMessage[] = opts?.inputQueue ? [...opts.inputQueue] : [] + let desiredStatus: DesiredStatus = opts?.desiredStatus ?? 'active' + let syncState: SourceState = opts?.syncState ?? { streams: {}, global: {} } + let readState: SourceState = opts?.readState ?? syncState + let rowIndexMap: RowIndexMap = opts?.rowIndexMap ?? {} let catalog: ConfiguredCatalog | undefined = opts?.catalog - let readComplete = opts?.readComplete ?? false - let pendingWrites = opts?.pendingWrites ?? false + let state: GoogleSheetWorkflowState = { ...opts?.state } + const writeRps = opts?.writeRps + + // Transient workflow-local state. + let operationCount = 0 setHandler(sourceInputSignal, (event: SourceInputMessage) => { inputQueue.push(event) }) - setHandler(desiredStatusSignal, (status: string) => { + setHandler(desiredStatusSignal, (status: DesiredStatus) => { desiredStatus = status }) - async function maybeContinueAsNew() { - if (++iteration >= CONTINUE_AS_NEW_THRESHOLD) { - await continueAsNew(pipelineId, { - desiredStatus, - setupDone: true, - state: syncState, - readState, - rowIndex, - catalog, - pendingWrites, - inputQueue: inputQueue.length > 0 ? [...inputQueue] : undefined, - readComplete, - writeRps: opts?.writeRps, - }) + // MARK: - State + + function derivePipelineStatus(): PipelineStatus { + if (state.teardown) return 'teardown' + if (state.paused) return 'paused' + if (state.setup !== 'completed') return 'setup' + return state.phase === 'ready' ? 'ready' : 'backfill' + } + + async function setState(next: Partial) { + const previousStatus = derivePipelineStatus() + state = { ...state, ...next } + const nextStatus = derivePipelineStatus() + if (previousStatus !== nextStatus) { + await updatePipelineStatus(pipelineId, nextStatus) } } - function shouldStop() { - return desiredStatus === 'deleted' || desiredStatus === 'paused' + function runInterrupted() { + return desiredStatus !== 'active' || operationCount >= CONTINUE_AS_NEW_THRESHOLD } - // Setup - if (!setupDone) { - await pipelineSetup(pipelineId) - catalog = await discoverCatalog(pipelineId) - setupDone = true - if (desiredStatus === 'deleted') { - await updatePipelineStatus(pipelineId, 'teardown') - await pipelineTeardown(pipelineId) - return + // MARK: - Live event loop + + async function waitForLiveEvents(): Promise { + await condition(() => inputQueue.length > 0 || runInterrupted()) + if (runInterrupted()) return null + return inputQueue.splice(0, EVENT_BATCH_SIZE) + } + + async function liveEventLoop(): Promise { + while (true) { + const events = await waitForLiveEvents() + if (!events) return + + const { count } = await readIntoQueue(pipelineId, { + input: events, + }) + if (count > 0) await setState({ pendingWrites: true }) + operationCount++ } } - await updatePipelineStatus(pipelineId, readComplete ? 'ready' : 'backfill') + // MARK: - Reconcile loop - async function readLoop(): Promise { - while (!shouldStop()) { - if (!catalog) catalog = await discoverCatalog(pipelineId) + async function waitForReconcileTurn(): Promise { + await condition(() => runInterrupted() || state.phase !== 'ready', ONE_WEEK_MS) + if (runInterrupted()) return false + return true + } - if (inputQueue.length > 0) { - const batch = inputQueue.splice(0, EVENT_BATCH_SIZE) - const { count } = await readGoogleSheetsIntoQueue(pipelineId, { - input: batch, - catalog, - rowIndex, - }) - if (count > 0) pendingWrites = true - await maybeContinueAsNew() - continue + async function reconcileLoop(): Promise { + while (await waitForReconcileTurn()) { + if (!state.phase) { + await setState({ phase: 'backfilling' }) + } else if (state.phase === 'ready') { + await setState({ phase: 'reconciling' }) } - if (!readComplete) { - const before = readState - const { count, state: nextReadState } = await readGoogleSheetsIntoQueue(pipelineId, { - state: readState, - state_limit: 1, - catalog, - rowIndex, - }) - if (count > 0) pendingWrites = true - readState = { - streams: { ...readState.streams, ...nextReadState.streams }, - global: { ...readState.global, ...nextReadState.global }, - } - if (count === 0 || deepEqual(readState, before)) { - readComplete = true - await updatePipelineStatus(pipelineId, 'ready') - } - await maybeContinueAsNew() - continue - } + const result = await readIntoQueue(pipelineId, { + state: readState, + state_limit: 1, + }) + + readState = result.state + if (result.count > 0) await setState({ pendingWrites: true }) + if (result.eof?.reason === 'complete') await setState({ phase: 'ready' }) - await condition(() => inputQueue.length > 0 || shouldStop()) + operationCount++ } } + // MARK: - Write loop + + async function waitForPendingWrites(): Promise { + await condition(() => state.pendingWrites || runInterrupted()) + if (runInterrupted()) return false + return true + } + async function writeLoop(): Promise { - while (!shouldStop()) { - if (pendingWrites) { - if (!catalog) catalog = await discoverCatalog(pipelineId) - const result = await writeGoogleSheetsFromQueue(pipelineId, { - maxBatch: 50, - catalog, - }) - pendingWrites = result.written > 0 - if (result.written > 0) syncState = result.state - for (const [stream, assignments] of Object.entries(result.rowAssignments)) { - rowIndex[stream] ??= {} - Object.assign(rowIndex[stream], assignments) - } - if (opts?.writeRps) await sleep(Math.ceil(1000 / opts.writeRps)) - await maybeContinueAsNew() + while (await waitForPendingWrites()) { + if (!catalog) catalog = await discoverCatalog(pipelineId) + + const result = await writeGoogleSheetsFromQueue(pipelineId, { + maxBatch: 50, + catalog, + rowIndexMap, + sourceState: syncState, + }) + + for (const [stream, assignments] of Object.entries(result.rowIndexMap)) { + rowIndexMap[stream] ??= {} + Object.assign(rowIndexMap[stream], assignments) + } + + if (result.written > 0) { + syncState = result.state } else { - await condition(() => pendingWrites || shouldStop()) + await setState({ pendingWrites: false }) } + + if (writeRps) await sleep(Math.ceil(1000 / writeRps)) + operationCount++ } } - // Main loop: handle pause/delete/active cycling - while (desiredStatus !== 'deleted') { - if (desiredStatus === 'deleted') break + // MARK: - Main logic + + if (state.setup !== 'completed') { + await setState({ setup: 'started' }) + if (!catalog) catalog = await discoverCatalog(pipelineId) + await pipelineSetup(pipelineId) + await setState({ setup: 'completed' }) + } + while (desiredStatus !== 'deleted') { if (desiredStatus === 'paused') { - await updatePipelineStatus(pipelineId, 'paused') + await setState({ paused: true }) await condition(() => desiredStatus !== 'paused') + await setState({ paused: false }) continue } - // Active — run read/write loops until paused or deleted - await updatePipelineStatus(pipelineId, readComplete ? 'ready' : 'backfill') - await Promise.all([readLoop(), writeLoop()]) + await Promise.all([liveEventLoop(), reconcileLoop(), writeLoop()]) + + if (operationCount >= CONTINUE_AS_NEW_THRESHOLD) { + return await continueAsNew(pipelineId, { + desiredStatus, + syncState, + readState, + rowIndexMap, + catalog, + inputQueue, + state, + writeRps, + }) + } } - await updatePipelineStatus(pipelineId, 'teardown') + await setState({ teardown: 'started' }) await pipelineTeardown(pipelineId) + await setState({ teardown: 'completed' }) } diff --git a/apps/service/src/temporal/workflows/index.ts b/apps/service/src/temporal/workflows/index.ts index a031f4db8..855541204 100644 --- a/apps/service/src/temporal/workflows/index.ts +++ b/apps/service/src/temporal/workflows/index.ts @@ -1,3 +1,2 @@ export { pipelineWorkflow } from './pipeline-workflow.js' export { googleSheetPipelineWorkflow } from './google-sheet-pipeline-workflow.js' -export { backfillPipelineWorkflow } from './backfill-pipeline-workflow.js' diff --git a/demo/stripe-to-google-sheets.sh b/demo/stripe-to-google-sheets.sh index ea8417071..487a3dcbd 100755 --- a/demo/stripe-to-google-sheets.sh +++ b/demo/stripe-to-google-sheets.sh @@ -15,7 +15,7 @@ echo "Sheet: https://docs.google.com/spreadsheets/d/$GOOGLE_SPREADSHEET_ID" >&2 PIPELINE=$(node -e "console.log(JSON.stringify({ source: { name: 'stripe', api_key: process.env.STRIPE_API_KEY, backfill_limit: 10 }, destination: { - name: 'google-sheets', + name: 'google_sheets', client_id: process.env.GOOGLE_CLIENT_ID, client_secret: process.env.GOOGLE_CLIENT_SECRET, access_token: 'unused', diff --git a/docs/service/entities.puml b/docs/service/entities.puml index dbe3f928f..9e52c025f 100644 --- a/docs/service/entities.puml +++ b/docs/service/entities.puml @@ -40,7 +40,7 @@ json " Sync " as SC { "schema_name": "string", "credential_id": "→ Credential (postgres)" }, - "type = google-sheets": { + "type = google_sheets": { "google_sheet_id": "string", "credential_id": "→ Credential (google)" } diff --git a/docs/service/scenarios.md b/docs/service/scenarios.md index 54f2ac657..a72f9a321 100644 --- a/docs/service/scenarios.md +++ b/docs/service/scenarios.md @@ -64,7 +64,7 @@ POST /syncs { source: { type: "stripe", credential_id: "cred_stripe_..." ``` POST /syncs { source: { type: "stripe", credential_id: "cred_stripe_..." }, - destination: { type: "google-sheets", google_sheet_id: "1abc...", credential_id: "cred_goog_..." } } + destination: { type: "google_sheets", google_sheet_id: "1abc...", credential_id: "cred_goog_..." } } ``` | Test | Validates | diff --git a/docs/service/sync-examples.ts b/docs/service/sync-examples.ts index d92794d8d..8f21d350e 100644 --- a/docs/service/sync-examples.ts +++ b/docs/service/sync-examples.ts @@ -59,7 +59,7 @@ export const sync_stripe_to_sheets = { credential_id: 'cred_stripe_prod', }, destination: { - type: 'google-sheets', + type: 'google_sheets', google_sheet_id: '1ABCdef_spreadsheet_id', credential_id: 'cred_google_finance', }, diff --git a/docs/service/sync-types.ts b/docs/service/sync-types.ts index 565e2b822..a50480a98 100644 --- a/docs/service/sync-types.ts +++ b/docs/service/sync-types.ts @@ -57,7 +57,7 @@ export type DestinationConfig = credential_id: string } | { - type: 'google-sheets' + type: 'google_sheets' google_sheet_id: string /** Credential type: `google` */ credential_id: string diff --git a/e2e/docker.test.sh b/e2e/docker.test.sh index 89ecc3611..e1b2459fa 100755 --- a/e2e/docker.test.sh +++ b/e2e/docker.test.sh @@ -89,7 +89,7 @@ echo "$STRIPE_OUTPUT" | head -3 || true # --- 2) Write to Google Sheets --- if [ -n "${GOOGLE_CLIENT_ID:-}" ]; then echo "==> Writing to Google Sheets (/pipeline_write)" - SHEETS_PARAMS=$(printf '{"source":{"type":"stripe","stripe":{"api_key":"%s"}},"destination":{"type":"google-sheets","google-sheets":{"client_id":"%s","client_secret":"%s","access_token":"unused","refresh_token":"%s","spreadsheet_id":"%s"}}}' \ + SHEETS_PARAMS=$(printf '{"source":{"type":"stripe","stripe":{"api_key":"%s"}},"destination":{"type":"google_sheets","google_sheets":{"client_id":"%s","client_secret":"%s","access_token":"unused","refresh_token":"%s","spreadsheet_id":"%s"}}}' \ "$STRIPE_API_KEY" "$GOOGLE_CLIENT_ID" "$GOOGLE_CLIENT_SECRET" "$GOOGLE_REFRESH_TOKEN" "$GOOGLE_SPREADSHEET_ID") SHEETS_OUTPUT=$(echo "$STRIPE_OUTPUT" | curl -s --max-time 60 -X POST "http://localhost:$PORT/pipeline_write" \ diff --git a/e2e/temporal.test.ts b/e2e/temporal.test.ts index e6f646717..da8d3a2f3 100644 --- a/e2e/temporal.test.ts +++ b/e2e/temporal.test.ts @@ -23,6 +23,34 @@ import { createActivities } from '@stripe/sync-service' // Helpers // --------------------------------------------------------------------------- +/** In-memory pipeline store for e2e tests. */ +function memoryPipelineStore() { + const data = new Map>() + return { + async get(id: string) { + const p = data.get(id) + if (!p) throw new Error(`Pipeline not found: ${id}`) + return p as any + }, + async set(id: string, pipeline: Record) { + data.set(id, pipeline) + }, + async update(id: string, patch: Record) { + const existing = data.get(id) + if (!existing) throw new Error(`Pipeline not found: ${id}`) + const updated = { ...existing, ...patch, id } + data.set(id, updated) + return updated as any + }, + async delete(id: string) { + data.delete(id) + }, + async list() { + return [...data.values()] as any[] + }, + } +} + const POSTGRES_URL = process.env.POSTGRES_URL ?? 'postgresql://postgres:postgres@localhost:5432/postgres' @@ -109,7 +137,7 @@ function createTestInfra() { const connectors = createConnectorResolver({ sources: { stripe: source }, - destinations: { postgres: pgDestination, 'google-sheets': sheetsDestination }, + destinations: { postgres: pgDestination, 'google_sheets': sheetsDestination }, }) // Start engine API (stateless sync execution) @@ -144,10 +172,12 @@ describe.skip('temporal e2e: stripe → postgres', () => { const infra = createTestInfra() const schema = schemaName() let stripe: Stripe + let pipelineStore: ReturnType beforeAll(async () => { await infra.setup() stripe = new Stripe(STRIPE_API_KEY) + pipelineStore = memoryPipelineStore() console.log(` Schema: ${schema}`) }, 60_000) @@ -159,17 +189,20 @@ describe.skip('temporal e2e: stripe → postgres', () => { }) it('backfills products then processes a live event via signal', async () => { + const pipelineId = `pipe_e2e_${Date.now()}` const pipeline = { - id: `pipe_e2e_${Date.now()}`, + id: pipelineId, source: { type: 'stripe', stripe: { api_key: STRIPE_API_KEY, backfill_limit: 5 } }, destination: { type: 'postgres', postgres: { connection_string: POSTGRES_URL, schema } }, streams: [{ name: 'products' }], } - console.log(` Pipeline: ${pipeline.id}`) + + await pipelineStore.set(pipelineId, pipeline as any) + console.log(` Pipeline: ${pipelineId}`) const handle = await infra.client.workflow.start('pipelineWorkflow', { - args: [pipeline], - workflowId: `pipe_${pipeline.id}`, + args: [pipelineId], + workflowId: `pipe_${pipelineId}`, taskQueue: 'pg-queue', }) @@ -179,6 +212,7 @@ describe.skip('temporal e2e: stripe → postgres', () => { workflowsPath: infra.workflowsPath, activities: createActivities({ engineUrl: infra.engineUrl, + pipelineStore, }), }) @@ -207,7 +241,7 @@ describe.skip('temporal e2e: stripe → postgres', () => { expect(sampleRows[0].id).toMatch(/^prod_/) console.log(` Sample: ${sampleRows[0].id} → ${sampleRows[0].name}`) - // --- Live event via stripe_event signal --- + // --- Live event via source_input signal --- const products = await stripe.products.list({ limit: 1 }) const product = products.data[0] const newName = `temporal-e2e-${Date.now()}` @@ -218,7 +252,7 @@ describe.skip('temporal e2e: stripe → postgres', () => { const events = await stripe.events.list({ limit: 5, type: 'product.updated' }) const event = events.data[0] console.log(` Fetched event ${event.id} (${event.type})`) - await handle.signal('stripe_event', event) + await handle.signal('source_input', event) await new Promise((r) => setTimeout(r, 5000)) @@ -231,11 +265,11 @@ describe.skip('temporal e2e: stripe → postgres', () => { console.log(` Live update verified: ${product.id} → "${updatedRows[0].name}"`) // --- Teardown --- - await handle.signal('delete') + await handle.signal('desired_status', 'deleted') try { await handle.result() } catch { - // Expected + // Expected — workflow terminates after teardown } }) @@ -252,10 +286,10 @@ describe.skip('temporal e2e: stripe → postgres', () => { }) // =========================================================================== -// 2. Stripe → Google Sheets (backfill) +// 2. Stripe → Google Sheets (backfill via googleSheetPipelineWorkflow) // =========================================================================== -describe.skip('temporal e2e: stripe → google-sheets', () => { +describe.skip('temporal e2e: stripe → google_sheets', () => { const STRIPE_API_KEY = process.env.STRIPE_API_KEY ?? '' const GOOGLE_CLIENT_ID = process.env.GOOGLE_CLIENT_ID ?? '' const GOOGLE_CLIENT_SECRET = process.env.GOOGLE_CLIENT_SECRET ?? '' @@ -263,6 +297,7 @@ describe.skip('temporal e2e: stripe → google-sheets', () => { const GOOGLE_SPREADSHEET_ID = process.env.GOOGLE_SPREADSHEET_ID ?? '' const infra = createTestInfra() let sheetsClient: ReturnType + let pipelineStore: ReturnType const streamName = 'products' @@ -273,6 +308,8 @@ describe.skip('temporal e2e: stripe → google-sheets', () => { auth.setCredentials({ refresh_token: GOOGLE_REFRESH_TOKEN }) sheetsClient = google.sheets({ version: 'v4', auth }) + pipelineStore = memoryPipelineStore() + console.log(` Spreadsheet: ${GOOGLE_SPREADSHEET_ID}`) console.log(` Tab: ${streamName}`) }, 60_000) @@ -301,24 +338,30 @@ describe.skip('temporal e2e: stripe → google-sheets', () => { }) it('backfills products from Stripe into a Google Sheet tab', async () => { + const pipelineId = `pipe_sheets_${Date.now()}` const pipeline = { - id: `pipe_sheets_${Date.now()}`, + id: pipelineId, source: { type: 'stripe', stripe: { api_key: STRIPE_API_KEY, backfill_limit: 3 } }, destination: { - name: 'google-sheets', - client_id: GOOGLE_CLIENT_ID, - client_secret: GOOGLE_CLIENT_SECRET, - refresh_token: GOOGLE_REFRESH_TOKEN, - access_token: 'placeholder', - spreadsheet_id: GOOGLE_SPREADSHEET_ID, + type: 'google_sheets', + 'google_sheets': { + client_id: GOOGLE_CLIENT_ID, + client_secret: GOOGLE_CLIENT_SECRET, + refresh_token: GOOGLE_REFRESH_TOKEN, + access_token: 'placeholder', + spreadsheet_id: GOOGLE_SPREADSHEET_ID, + }, }, streams: [{ name: streamName }], } - console.log(` Pipeline: ${pipeline.id}`) - const handle = await infra.client.workflow.start('pipelineWorkflow', { - args: [pipeline], - workflowId: `pipe_${pipeline.id}`, + // Pre-populate the pipeline store (workflow reads config from store, not args) + await pipelineStore.set(pipelineId, pipeline as any) + console.log(` Pipeline: ${pipelineId}`) + + const handle = await infra.client.workflow.start('googleSheetPipelineWorkflow', { + args: [pipelineId], + workflowId: `pipe_${pipelineId}`, taskQueue: 'sheets-queue', }) @@ -328,6 +371,7 @@ describe.skip('temporal e2e: stripe → google-sheets', () => { workflowsPath: infra.workflowsPath, activities: createActivities({ engineUrl: infra.engineUrl, + pipelineStore, }), }) @@ -359,11 +403,11 @@ describe.skip('temporal e2e: stripe → google-sheets', () => { } console.log(` Sample: ${dataRows[0][idCol]}`) - await handle.signal('delete') + await handle.signal('desired_status', 'deleted') try { await handle.result() } catch { - // Expected + // Expected — workflow terminates after teardown } }) }) diff --git a/package.json b/package.json index f504533ce..6554d4fb1 100644 --- a/package.json +++ b/package.json @@ -15,8 +15,9 @@ "lint": "pnpm -r run lint", "format": "prettier --write .", "format:check": "prettier --check .", + "down": "docker compose down && (lsof -ti:4010,4020,5173 | xargs kill -9 2>/dev/null; true)", "dev:engine": "pnpm --filter @stripe/sync-engine dev", - "dev": "concurrently -n engine,service,worker,dashboard -c cyan,green,yellow,magenta \"pnpm --filter @stripe/sync-engine dev\" \"pnpm --filter @stripe/sync-service dev:serve\" \"pnpm --filter @stripe/sync-service dev:worker\" \"pnpm --filter @stripe/sync-dashboard dev\"", + "dev": "docker compose up -d; concurrently -n engine,service,worker,dashboard -c cyan,green,yellow,magenta \"pnpm --filter @stripe/sync-engine dev\" \"pnpm --filter @stripe/sync-service dev:serve\" \"pnpm --filter @stripe/sync-service dev:worker\" \"pnpm --filter @stripe/sync-dashboard dev\"", "visualizer": "pnpm --filter @stripe/sync-visualizer dev" }, "dependencies": { diff --git a/scripts/generate-openapi-specs.ts b/scripts/generate-openapi-specs.ts index f2ca6dcce..56969715f 100644 --- a/scripts/generate-openapi-specs.ts +++ b/scripts/generate-openapi-specs.ts @@ -21,7 +21,7 @@ const resolver = await createConnectorResolver({ sources: { stripe: (sourceStripe as any).default ?? sourceStripe }, destinations: { postgres: (destinationPostgres as any).default ?? destinationPostgres, - 'google-sheets': (destinationGoogleSheets as any).default ?? destinationGoogleSheets, + 'google_sheets': (destinationGoogleSheets as any).default ?? destinationGoogleSheets, }, })