Skip to content

Commit 24b9441

Browse files
tonyxiaoclaude
andcommitted
Refactor Temporal support into src/temporal/; add worker CLI command
- Move temporal.ts → src/temporal/{bridge,activities,types,worker,workflows}.ts - Add `worker` subcommand to CLI for running a Temporal worker process - Fix StateStore mocks in pipeline.test.ts to include get() - Add @temporalio/{activity,worker,workflow,testing} as optional deps Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Committed-By-Agent: claude
1 parent 8f34cb4 commit 24b9441

12 files changed

Lines changed: 1750 additions & 129 deletions

File tree

apps/engine/src/lib/pipeline.test.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,7 @@ describe('persistState()', () => {
233233
it('calls store.set for state messages', async () => {
234234
const calls: Array<{ stream: string; data: unknown }> = []
235235
const store: StateStore = {
236+
get: async () => undefined,
236237
set: async (stream, data) => {
237238
calls.push({ stream, data })
238239
},
@@ -245,7 +246,7 @@ describe('persistState()', () => {
245246
})
246247

247248
it('yields all messages through unchanged', async () => {
248-
const store: StateStore = { set: async () => {} }
249+
const store: StateStore = { get: async () => undefined, set: async () => {} }
249250
const msgs: DestinationOutput[] = [
250251
{ type: 'state', stream: 'customers', data: { cursor: 'abc' } },
251252
{ type: 'log', level: 'info', message: 'done' },
@@ -259,6 +260,7 @@ describe('persistState()', () => {
259260
it('does not call store.set for non-state messages', async () => {
260261
const calls: Array<unknown> = []
261262
const store: StateStore = {
263+
get: async () => undefined,
262264
set: async (...args) => {
263265
calls.push(args)
264266
},
@@ -274,6 +276,7 @@ describe('persistState()', () => {
274276
it('persists multiple state messages in order', async () => {
275277
const calls: Array<{ stream: string; data: unknown }> = []
276278
const store: StateStore = {
279+
get: async () => undefined,
277280
set: async (stream, data) => {
278281
calls.push({ stream, data })
279282
},

apps/service/package.json

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,13 @@
3535
"zod": "^4.3.6"
3636
},
3737
"optionalDependencies": {
38-
"@temporalio/client": "^1"
38+
"@temporalio/client": "^1",
39+
"@temporalio/activity": "^1",
40+
"@temporalio/worker": "^1",
41+
"@temporalio/workflow": "^1"
3942
},
4043
"devDependencies": {
44+
"@temporalio/testing": "^1",
4145
"@types/node": "^24.10.1",
4246
"vitest": "^3.2.4"
4347
},
Lines changed: 224 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,224 @@
1+
import { describe, it, expect, beforeAll, afterAll } from 'vitest'
2+
import { TestWorkflowEnvironment } from '@temporalio/testing'
3+
import { Worker } from '@temporalio/worker'
4+
import path from 'node:path'
5+
import type { SyncActivities, RunResult } from '../temporal/types.js'
6+
7+
// workflowsPath must point to compiled JS (Temporal bundles it for V8 sandbox)
8+
const workflowsPath = path.resolve(process.cwd(), 'dist/temporal/workflows.js')
9+
10+
const noErrors: RunResult = { errors: [] }
11+
12+
function stubActivities(overrides: Partial<SyncActivities> = {}): SyncActivities {
13+
return {
14+
setup: async () => {},
15+
run: async () => noErrors,
16+
teardown: async () => {},
17+
...overrides,
18+
}
19+
}
20+
21+
let testEnv: TestWorkflowEnvironment
22+
23+
beforeAll(async () => {
24+
testEnv = await TestWorkflowEnvironment.createLocal()
25+
}, 120_000)
26+
27+
afterAll(async () => {
28+
await testEnv?.teardown()
29+
})
30+
31+
describe('syncWorkflow (unit — stubbed activities)', () => {
32+
it('runs setup then continuous reconciliation until delete', async () => {
33+
let setupCalled = false
34+
let runCallCount = 0
35+
36+
const worker = await Worker.create({
37+
connection: testEnv.nativeConnection,
38+
taskQueue: 'test-queue-1',
39+
workflowsPath,
40+
activities: stubActivities({
41+
setup: async () => {
42+
setupCalled = true
43+
},
44+
run: async () => {
45+
runCallCount++
46+
return noErrors
47+
},
48+
}),
49+
})
50+
51+
await worker.runUntil(async () => {
52+
const handle = await testEnv.client.workflow.start('syncWorkflow', {
53+
args: ['sync_test_1'],
54+
workflowId: 'test-sync-1',
55+
taskQueue: 'test-queue-1',
56+
})
57+
58+
// Let it run several reconciliation pages
59+
await new Promise((r) => setTimeout(r, 2000))
60+
61+
const status = await handle.query('status')
62+
expect(status.iteration).toBeGreaterThan(0)
63+
64+
await handle.signal('delete')
65+
await handle.result()
66+
67+
expect(setupCalled).toBe(true)
68+
expect(runCallCount).toBeGreaterThan(1)
69+
})
70+
})
71+
72+
it('processes stripe_event signals as optimistic updates', async () => {
73+
const runCalls: { syncId: string; input?: unknown[] }[] = []
74+
75+
const worker = await Worker.create({
76+
connection: testEnv.nativeConnection,
77+
taskQueue: 'test-queue-2',
78+
workflowsPath,
79+
activities: stubActivities({
80+
run: async (syncId: string, input?: unknown[]) => {
81+
runCalls.push({ syncId, input: input ?? undefined })
82+
return noErrors
83+
},
84+
}),
85+
})
86+
87+
await worker.runUntil(async () => {
88+
const handle = await testEnv.client.workflow.start('syncWorkflow', {
89+
args: ['sync_test_2'],
90+
workflowId: 'test-sync-2',
91+
taskQueue: 'test-queue-2',
92+
})
93+
94+
// Let reconciliation start
95+
await new Promise((r) => setTimeout(r, 1500))
96+
97+
// Send events
98+
await handle.signal('stripe_event', {
99+
id: 'evt_1',
100+
type: 'customer.created',
101+
})
102+
await handle.signal('stripe_event', {
103+
id: 'evt_2',
104+
type: 'product.updated',
105+
})
106+
107+
await new Promise((r) => setTimeout(r, 2000))
108+
await handle.signal('delete')
109+
await handle.result()
110+
111+
// Find event-bearing run calls (input is defined)
112+
const eventCalls = runCalls.filter((c) => c.input)
113+
expect(eventCalls.length).toBeGreaterThanOrEqual(1)
114+
115+
const allEvents = eventCalls.flatMap((c) => c.input!)
116+
expect(allEvents).toEqual(
117+
expect.arrayContaining([
118+
expect.objectContaining({ id: 'evt_1' }),
119+
expect.objectContaining({ id: 'evt_2' }),
120+
])
121+
)
122+
123+
// All calls should use the same syncId
124+
for (const call of runCalls) {
125+
expect(call.syncId).toBe('sync_test_2')
126+
}
127+
})
128+
})
129+
130+
it('pauses and resumes processing', async () => {
131+
const worker = await Worker.create({
132+
connection: testEnv.nativeConnection,
133+
taskQueue: 'test-queue-3',
134+
workflowsPath,
135+
activities: stubActivities(),
136+
})
137+
138+
await worker.runUntil(async () => {
139+
const handle = await testEnv.client.workflow.start('syncWorkflow', {
140+
args: ['sync_test_3'],
141+
workflowId: 'test-sync-3',
142+
taskQueue: 'test-queue-3',
143+
})
144+
145+
await new Promise((r) => setTimeout(r, 1000))
146+
await handle.signal('pause')
147+
await new Promise((r) => setTimeout(r, 500))
148+
149+
const status = await handle.query('status')
150+
expect(status.paused).toBe(true)
151+
152+
await handle.signal('resume')
153+
await new Promise((r) => setTimeout(r, 500))
154+
await handle.signal('delete')
155+
await handle.result()
156+
})
157+
})
158+
159+
it('triggers teardown on delete', async () => {
160+
let teardownCalled = false
161+
let teardownSyncId: string | undefined
162+
163+
const worker = await Worker.create({
164+
connection: testEnv.nativeConnection,
165+
taskQueue: 'test-queue-4',
166+
workflowsPath,
167+
activities: stubActivities({
168+
run: async () => {
169+
// Slow run so delete arrives mid-reconciliation
170+
await new Promise((r) => setTimeout(r, 500))
171+
return noErrors
172+
},
173+
teardown: async (syncId: string) => {
174+
teardownCalled = true
175+
teardownSyncId = syncId
176+
},
177+
}),
178+
})
179+
180+
await worker.runUntil(async () => {
181+
const handle = await testEnv.client.workflow.start('syncWorkflow', {
182+
args: ['sync_test_4'],
183+
workflowId: 'test-sync-4',
184+
taskQueue: 'test-queue-4',
185+
})
186+
187+
await new Promise((r) => setTimeout(r, 300))
188+
await handle.signal('delete')
189+
await handle.result()
190+
191+
expect(teardownCalled).toBe(true)
192+
expect(teardownSyncId).toBe('sync_test_4')
193+
})
194+
})
195+
196+
it('skips setup when phase is running (continueAsNew case)', async () => {
197+
let setupCalled = false
198+
199+
const worker = await Worker.create({
200+
connection: testEnv.nativeConnection,
201+
taskQueue: 'test-queue-5',
202+
workflowsPath,
203+
activities: stubActivities({
204+
setup: async () => {
205+
setupCalled = true
206+
},
207+
}),
208+
})
209+
210+
await worker.runUntil(async () => {
211+
const handle = await testEnv.client.workflow.start('syncWorkflow', {
212+
args: ['sync_test_5', { phase: 'running' }],
213+
workflowId: 'test-sync-5',
214+
taskQueue: 'test-queue-5',
215+
})
216+
217+
await new Promise((r) => setTimeout(r, 1000))
218+
await handle.signal('delete')
219+
await handle.result()
220+
221+
expect(setupCalled).toBe(false)
222+
})
223+
})
224+
})

apps/service/src/cli/main.ts

Lines changed: 56 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
1+
import path from 'node:path'
12
import { Readable } from 'node:stream'
23
import { defineCommand } from 'citty'
34
import { createCliFromSpec } from '@stripe/sync-ts-cli/openapi'
45
import { serve } from '@hono/node-server'
56
import { createApp } from '../api/app.js'
6-
import type { TemporalOptions } from '../lib/temporal.js'
7+
import type { TemporalOptions } from '../temporal/bridge.js'
78

89
async function createTemporalClient(address: string, taskQueue: string): Promise<TemporalOptions> {
910
// Dynamic import — @temporalio/client is an optional dependency
@@ -63,6 +64,59 @@ const serveCmd = defineCommand({
6364
},
6465
})
6566

67+
// Temporal worker command
68+
const workerCmd = defineCommand({
69+
meta: { name: 'worker', description: 'Start a Temporal worker for sync workflows' },
70+
args: {
71+
'temporal-address': {
72+
type: 'string',
73+
required: true,
74+
description: 'Temporal server address (e.g. localhost:7233)',
75+
},
76+
'temporal-namespace': {
77+
type: 'string',
78+
default: 'default',
79+
description: 'Temporal namespace (default: default)',
80+
},
81+
'temporal-task-queue': {
82+
type: 'string',
83+
default: 'sync-engine',
84+
description: 'Temporal task queue name (default: sync-engine)',
85+
},
86+
'service-url': {
87+
type: 'string',
88+
default: 'http://localhost:4020',
89+
description: 'Sync service HTTP URL (default: http://localhost:4020)',
90+
},
91+
},
92+
async run({ args }) {
93+
const { createWorker } = await import('../temporal/worker.js')
94+
const taskQueue = args['temporal-task-queue'] || 'sync-engine'
95+
const namespace = args['temporal-namespace'] || 'default'
96+
const serviceUrl = args['service-url'] || 'http://localhost:4020'
97+
const temporalAddress = args['temporal-address']
98+
99+
// workflowsPath: resolve relative to the package dist directory
100+
const pkgDir = path.resolve(import.meta.dirname ?? process.cwd(), '../..')
101+
const workflowsPath = path.resolve(pkgDir, 'dist/temporal/workflows.js')
102+
103+
const worker = await createWorker({
104+
temporalAddress,
105+
namespace,
106+
taskQueue,
107+
serviceUrl,
108+
workflowsPath,
109+
})
110+
111+
console.log('Starting Temporal worker...')
112+
console.log(` Temporal: ${temporalAddress} (${namespace})`)
113+
console.log(` Task queue: ${taskQueue}`)
114+
console.log(` Service URL: ${serviceUrl}`)
115+
116+
await worker.run()
117+
},
118+
})
119+
66120
export async function createProgram(opts?: { dataDir?: string }) {
67121
const app = createApp({ dataDir: opts?.dataDir })
68122
const res = await app.request('/openapi.json')
@@ -84,6 +138,6 @@ export async function createProgram(opts?: { dataDir?: string }) {
84138

85139
return defineCommand({
86140
...specCli,
87-
subCommands: { serve: serveCmd, ...specCli.subCommands },
141+
subCommands: { serve: serveCmd, worker: workerCmd, ...specCli.subCommands },
88142
})
89143
}

apps/service/src/index.ts

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,15 @@ export { SyncService } from './lib/service.js'
3434
export type { SyncServiceOptions } from './lib/service.js'
3535

3636
// Temporal bridge
37-
export { TemporalBridge } from './lib/temporal.js'
38-
export type { TemporalOptions } from './lib/temporal.js'
37+
export { TemporalBridge } from './temporal/bridge.js'
38+
export type { TemporalOptions } from './temporal/bridge.js'
3939

4040
// API app factory
4141
export { createApp } from './api/app.js'
4242
export type { AppOptions } from './api/app.js'
43+
44+
// Temporal workflow types (for consumers that need to reference them)
45+
export type { RunResult, SyncActivities, WorkflowStatus } from './temporal/types.js'
46+
export { createActivities } from './temporal/activities.js'
47+
export { createWorker } from './temporal/worker.js'
48+
export type { WorkerOptions } from './temporal/worker.js'

apps/service/src/lib/service.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,10 @@ import type {
99
import type { CredentialStore, ConfigStore, LogSink, StateStore } from './stores.js'
1010
import type { SyncConfig } from './schemas.js'
1111
import { resolve } from './resolve.js'
12-
import { TemporalBridge } from './temporal.js'
13-
import type { TemporalOptions } from './temporal.js'
12+
import { TemporalBridge } from '../temporal/bridge.js'
13+
import type { TemporalOptions } from '../temporal/bridge.js'
1414

15-
export type { TemporalOptions } from './temporal.js'
15+
export type { TemporalOptions } from '../temporal/bridge.js'
1616

1717
// MARK: - Async queue
1818

0 commit comments

Comments
 (0)