Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions apps/engine/src/lib/pipeline.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,41 @@ describe('enforceCatalog()', () => {
expect((result[0] as { data: unknown }).data).toEqual({ id: 'sub_1', status: 'active' })
})

it('drops unknown internal fields that are not present in the catalog schema', async () => {
const msgs: Message[] = [
{
type: 'record',
stream: 'subscriptions',
data: {
id: 'sub_1',
status: 'active',
customer: 'cus_1',
_row_key: '["sub_1"]',
_row_number: 12,
},
emitted_at: 1,
},
]
const result = await drain(
enforceCatalog(
catalog([
{
name: 'subscriptions',
json_schema: {
type: 'object',
properties: { id: { type: 'string' }, status: { type: 'string' } },
},
},
])
)(toAsync(msgs))
)
expect(result).toHaveLength(1)
expect((result[0] as { data: unknown }).data).toEqual({
id: 'sub_1',
status: 'active',
})
})

it('passes records through unchanged when json_schema is absent', async () => {
const msgs: Message[] = [
{
Expand Down
2 changes: 1 addition & 1 deletion apps/engine/src/lib/pipeline.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ export function enforceCatalog(
if (props) {
yield {
...msg,
data: Object.fromEntries(Object.entries(msg.data).filter(([k]) => k in props)),
data: Object.fromEntries(Object.entries(msg.data).filter(([key]) => key in props)),
}
} else {
yield msg
Expand Down
137 changes: 133 additions & 4 deletions apps/service/src/__tests__/workflow.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ import { TestWorkflowEnvironment } from '@temporalio/testing'
import { Worker } from '@temporalio/worker'
import path from 'node:path'
import type { PipelineConfig } from '@stripe/sync-engine'
import type { SyncActivities } from '../temporal/activities.js'
import type { RunResult } from '../temporal/activities.js'
import type { SyncActivities } from '../temporal/activities/index.js'
import type { RunResult } from '../temporal/activities/index.js'

// workflowsPath must point to compiled JS (Temporal bundles it for V8 sandbox)
const workflowsPath = path.resolve(process.cwd(), 'dist/temporal/workflows.js')
// workflowsPath points to the compiled workflow directory.
const workflowsPath = path.resolve(process.cwd(), 'dist/temporal/workflows')

const noErrors: RunResult = { errors: [], state: {} }

Expand All @@ -19,9 +19,17 @@ const testPipeline = {

function stubActivities(overrides: Partial<SyncActivities> = {}): SyncActivities {
return {
discoverCatalog: async () => ({ streams: [] }),
setup: async () => ({}),
syncImmediate: async () => noErrors,
readIntoQueueWithState: async () => ({ count: 0, state: {} }),
readIntoQueue: async () => ({ count: 0, state: {} }),
writeGoogleSheetsFromQueue: async () => ({
errors: [],
state: {},
written: 0,
rowAssignments: {},
}),
writeFromQueue: async () => ({ errors: [], state: {}, written: 0 }),
teardown: async () => {},
...overrides,
Expand Down Expand Up @@ -284,3 +292,124 @@ describe('pipelineWorkflow (unit — stubbed activities)', () => {
})
})
})

describe('pipelineGoogleSheetsWorkflow (unit — stubbed activities)', () => {
it('uses the Sheets-specific read path and catalog discovery', async () => {
let discoverCalls = 0
let readCalls = 0
let syncCalls = 0

const worker = await Worker.create({
connection: testEnv.nativeConnection,
taskQueue: 'test-queue-gs-1',
workflowsPath,
activities: stubActivities({
discoverCatalog: async () => {
discoverCalls++
return { streams: [] }
},
readIntoQueueWithState: async () => {
readCalls++
return { count: 0, state: {} }
},
syncImmediate: async () => {
syncCalls++
return noErrors
},
}),
})

await worker.runUntil(async () => {
const handle = await testEnv.client.workflow.start('pipelineGoogleSheetsWorkflow', {
args: [
{
...testPipeline,
destination: {
type: 'google-sheets',
spreadsheet_id: 'sheet_123',
},
},
],
workflowId: 'test-gs-sync-1',
taskQueue: 'test-queue-gs-1',
})

await new Promise((r) => setTimeout(r, 1500))
await handle.signal('delete')
await handle.result()

expect(discoverCalls).toBeGreaterThanOrEqual(1)
expect(readCalls).toBeGreaterThanOrEqual(1)
expect(syncCalls).toBe(0)
})
})

it('passes the discovered catalog into the Sheets write activity', async () => {
const discoveredCatalog = {
streams: [
{
stream: {
name: 'customers',
primary_key: [['id']],
json_schema: {
type: 'object',
properties: {
id: { type: 'string' },
},
},
},
sync_mode: 'full_refresh' as const,
destination_sync_mode: 'append' as const,
},
],
}
let readCalls = 0
let writeCatalog: unknown

const worker = await Worker.create({
connection: testEnv.nativeConnection,
taskQueue: 'test-queue-gs-2',
workflowsPath,
activities: stubActivities({
discoverCatalog: async () => discoveredCatalog,
readIntoQueueWithState: async () => {
readCalls++
return readCalls === 1
? { count: 1, state: { customers: { cursor: 'cus_1' } } }
: { count: 0, state: { customers: { cursor: 'cus_1' } } }
},
writeGoogleSheetsFromQueue: async (_config, _pipelineId, opts) => {
writeCatalog = opts?.catalog
return {
errors: [],
state: { customers: { cursor: 'cus_1' } },
written: 0,
rowAssignments: {},
}
},
}),
})

await worker.runUntil(async () => {
const handle = await testEnv.client.workflow.start('pipelineGoogleSheetsWorkflow', {
args: [
{
...testPipeline,
destination: {
type: 'google-sheets',
spreadsheet_id: 'sheet_456',
},
},
],
workflowId: 'test-gs-sync-2',
taskQueue: 'test-queue-gs-2',
})

await new Promise((r) => setTimeout(r, 1500))
await handle.signal('delete')
await handle.result()

expect(writeCatalog).toEqual(discoveredCatalog)
})
})
})
4 changes: 2 additions & 2 deletions apps/service/src/api/app.integration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import Stripe from 'stripe'
import sourceStripe from '@stripe/sync-source-stripe'
import destinationPostgres from '@stripe/sync-destination-postgres'
import { createApp as createEngineApp, createConnectorResolver } from '@stripe/sync-engine'
import { createActivities } from '../temporal/activities.js'
import { createActivities } from '../temporal/activities/index.js'
import { createApp } from './app.js'
import type { paths } from '../__generated__/openapi.js'

Expand All @@ -24,7 +24,7 @@ const STRIPE_API_KEY = process.env['STRIPE_API_KEY']!
const POSTGRES_URL = process.env['POSTGRES_URL'] ?? process.env['DATABASE_URL']!
const TASK_QUEUE = `test-app-${Date.now()}`
const SCHEMA = `integration_${Date.now()}`
const workflowsPath = path.resolve(process.cwd(), 'dist/temporal/workflows.js')
const workflowsPath = path.resolve(process.cwd(), 'dist/temporal/workflows')

const SKIP_CLEANUP = process.env['SKIP_CLEANUP'] === '1'

Expand Down
Loading