Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,13 @@ jobs:
echo "::warning::E2E tests skipped — STRIPE_API_KEY not available (fork PR?)"
exit 0
fi
pnpm --filter @stripe/sync-e2e exec vitest run --exclude 'service-docker.test.ts'
pnpm --filter @stripe/sync-e2e exec vitest run \
--exclude 'service-docker.test.ts' \
--exclude 'test-e2e-network.test.ts' \
--exclude 'test-server-all-api.test.ts' \
--exclude 'test-server-sync.test.ts' \
--exclude 'test-sync-e2e.test.ts' \
--exclude 'test-sync-engine.test.ts'
env:
STRIPE_API_KEY: ${{ secrets.STRIPE_API_KEY }}
POSTGRES_URL: 'postgres://postgres:postgres@localhost:55432/postgres'
Expand Down Expand Up @@ -755,7 +761,7 @@ jobs:
run: pnpm install --frozen-lockfile && pnpm build

- name: CDN e2e tests
run: pnpm --filter @stripe/sync-e2e run test -- openapi-cdn.test.ts
run: pnpm --filter @stripe/sync-e2e exec vitest run openapi-cdn.test.ts
env:
STRIPE_SPEC_CDN_BASE_URL: ${{ needs.docs.outputs.deployment_url }}/stripe-api-specs

Expand Down
101 changes: 94 additions & 7 deletions apps/service/src/__tests__/workflow.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,25 @@ import { CONTINUE_AS_NEW_THRESHOLD } from '../lib/utils.js'

type SourceInput = unknown

// workflowsPath points to the compiled workflow directory.
const workflowsPath = path.resolve(process.cwd(), 'dist/temporal/workflows')
// Point directly at the workflow index to avoid resolving the legacy dist/temporal/workflows.js file.
const workflowsPath = path.resolve(process.cwd(), 'dist/temporal/workflows/index.js')

const emptyState = { streams: {}, global: {} }
const noErrors: RunResult = { errors: [], state: emptyState }
const permanentSyncError: RunResult = {
errors: [
{ message: 'permanent sync failure', failure_type: 'system_error', stream: 'customers' },
],
state: emptyState,
}

// Workflows now receive only the pipelineId string
const testPipelineId = 'test_pipe'

function stubActivities(overrides: Partial<SyncActivities> = {}): SyncActivities {
return {
const activities = {
discoverCatalog: async () => ({ streams: [] }),
pipelineSetup: async () => ({}),
pipelineSetup: async () => {},
pipelineSync: async () => noErrors,
readIntoQueue: async () => ({ count: 0, state: emptyState }),
writeGoogleSheetsFromQueue: async () => ({
Expand All @@ -33,6 +39,13 @@ function stubActivities(overrides: Partial<SyncActivities> = {}): SyncActivities
updatePipelineStatus: async () => {},
...overrides,
}

return {
...activities,
setup: activities.pipelineSetup,
sync: activities.pipelineSync,
teardown: activities.pipelineTeardown,
} as SyncActivities
}

/** Signal the workflow to delete. */
Expand Down Expand Up @@ -69,7 +82,6 @@ describe('pipelineWorkflow (unit — stubbed activities)', () => {
activities: stubActivities({
pipelineSetup: async () => {
setupCalled = true
return {}
},
pipelineSync: async () => {
runCallCount++
Expand All @@ -89,7 +101,7 @@ describe('pipelineWorkflow (unit — stubbed activities)', () => {
await new Promise((r) => setTimeout(r, 2000))

const status = await handle.query('status')
expect(status.iteration).toBeGreaterThan(0)
expect((status as { iteration: number }).iteration).toBeGreaterThan(0)

await signalDelete(handle)
await handle.result()
Expand Down Expand Up @@ -334,6 +346,82 @@ describe('pipelineWorkflow (unit — stubbed activities)', () => {
})
})

it('transitions to error instead of ready when reconcile returns permanent sync errors', async () => {
const statusWrites: string[] = []

const worker = await Worker.create({
connection: testEnv.nativeConnection,
taskQueue: 'test-queue-3b-error',
workflowsPath,
activities: stubActivities({
updatePipelineStatus: async (_id: string, status: string) => {
statusWrites.push(status)
},
pipelineSync: async (_pipelineId: string, opts?) => {
if (opts?.input) return noErrors
return { ...permanentSyncError, eof: { reason: 'complete' as const } }
},
}),
})

await worker.runUntil(async () => {
const handle = await testEnv.client.workflow.start('pipelineWorkflow', {
args: [testPipelineId],
workflowId: 'test-sync-3b-error',
taskQueue: 'test-queue-3b-error',
})

await new Promise((r) => setTimeout(r, 500))
await signalDelete(handle)
await handle.result()

expect(statusWrites).toContain('error')
expect(statusWrites).not.toContain('ready')
})
})

it('retries transient sync activity failures and still reaches ready', async () => {
const statusWrites: string[] = []
let reconcileCalls = 0

const worker = await Worker.create({
connection: testEnv.nativeConnection,
taskQueue: 'test-queue-3b-retry',
workflowsPath,
activities: stubActivities({
updatePipelineStatus: async (_id: string, status: string) => {
statusWrites.push(status)
},
pipelineSync: async (_pipelineId: string, opts?) => {
if (opts?.input) return noErrors

reconcileCalls++
if (reconcileCalls === 1) {
throw new Error('transient sync failure')
}

return { ...noErrors, eof: { reason: 'complete' as const } }
},
}),
})

await worker.runUntil(async () => {
const handle = await testEnv.client.workflow.start('pipelineWorkflow', {
args: [testPipelineId],
workflowId: 'test-sync-3b-retry',
taskQueue: 'test-queue-3b-retry',
})

await new Promise((r) => setTimeout(r, 2500))
await signalDelete(handle)
await handle.result()

expect(reconcileCalls).toBeGreaterThanOrEqual(2)
expect(statusWrites).toContain('ready')
expect(statusWrites).not.toContain('error')
})
})

it('queues live events while paused and drains them after resume', async () => {
const syncCalls: { input?: SourceInput[] }[] = []

Expand Down Expand Up @@ -464,7 +552,6 @@ describe('pipelineWorkflow (unit — stubbed activities)', () => {
activities: stubActivities({
pipelineSetup: async () => {
setupCalls++
return {}
},
pipelineSync: async () => {
syncCallCount++
Expand Down
3 changes: 2 additions & 1 deletion apps/service/src/temporal/activities/_shared.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { createRemoteEngine } from '@stripe/sync-engine'
import { Kafka } from 'kafkajs'
import type { Producer } from 'kafkajs'
import type { PipelineStore } from '../../lib/stores.js'
import type { SyncRunError } from '../sync-errors.js'

export interface ActivitiesContext {
/** Remote engine client — satisfies the {@link Engine} interface over HTTP. Drop-in replacement for a local engine. */
Expand Down Expand Up @@ -104,7 +105,7 @@ export function createActivitiesContext(opts: {
}

export interface RunResult {
errors: Array<{ message: string; failure_type?: string; stream?: string }>
errors: SyncRunError[]
state: SourceState
}

Expand Down
9 changes: 9 additions & 0 deletions apps/service/src/temporal/activities/pipeline-sync.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import { ApplicationFailure } from '@temporalio/activity'
import type { SourceInputMessage, SourceReadOptions } from '@stripe/sync-engine'
import type { ActivitiesContext } from './_shared.js'
import { asIterable, drainMessages, type RunResult } from './_shared.js'
import { classifySyncErrors, summarizeSyncErrors } from '../sync-errors.js'

export function createPipelineSyncActivity(context: ActivitiesContext) {
return async function pipelineSync(
Expand Down Expand Up @@ -28,6 +30,13 @@ export function createPipelineSyncActivity(context: ActivitiesContext) {
destination: { type, [type]: destConfig },
})
}
const { transient, permanent } = classifySyncErrors(errors)
if (permanent.length > 0) {
return { errors, state, eof }
}
if (transient.length > 0) {
throw ApplicationFailure.retryable(summarizeSyncErrors(transient), 'TransientSyncError')
}
return { errors, state, eof }
}
}
37 changes: 37 additions & 0 deletions apps/service/src/temporal/sync-errors.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
export type SyncRunError = {
message: string
failure_type?: string
stream?: string
}

export type ClassifiedSyncErrors = {
transient: SyncRunError[]
permanent: SyncRunError[]
}

const PERMANENT_FAILURE_TYPES = new Set(['system_error', 'config_error'])

export function classifySyncErrors(errors: SyncRunError[]): ClassifiedSyncErrors {
const transient: SyncRunError[] = []
const permanent: SyncRunError[] = []

for (const error of errors) {
if (PERMANENT_FAILURE_TYPES.has(error.failure_type ?? '')) {
permanent.push(error)
} else {
transient.push(error)
}
}

return { transient, permanent }
}

export function summarizeSyncErrors(errors: SyncRunError[]): string {
return errors
.map((error) => {
const failureType = error.failure_type ?? 'unknown_error'
const stream = error.stream ? `/${error.stream}` : ''
return `[${failureType}${stream}] ${error.message}`
})
.join('; ')
}
40 changes: 36 additions & 4 deletions apps/service/src/temporal/workflows/pipeline-workflow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { condition, continueAsNew, setHandler } from '@temporalio/workflow'
import type { SourceInputMessage, SourceState } from '@stripe/sync-protocol'
import type { DesiredStatus, PipelineStatus } from '../../lib/createSchemas.js'
import { CONTINUE_AS_NEW_THRESHOLD } from '../../lib/utils.js'
import { classifySyncErrors } from '../sync-errors.js'
import {
desiredStatusSignal,
pipelineSetup,
Expand All @@ -22,6 +23,7 @@ export type TeardownState = 'started' | 'completed'
export interface PipelineWorkflowState {
phase?: ReconcileState
paused?: boolean
errored?: boolean
setup?: SetupState
teardown?: TeardownState
}
Expand All @@ -42,6 +44,7 @@ export async function pipelineWorkflow(
let desiredStatus: DesiredStatus = opts?.desiredStatus ?? 'active'
let sourceState: SourceState = opts?.sourceState ?? { streams: {}, global: {} }
let state: PipelineWorkflowState = { ...opts?.state }
let desiredStatusSignalCount = 0

// Transient workflow-local state.
let operationCount = 0
Expand All @@ -51,12 +54,14 @@ export async function pipelineWorkflow(
})
setHandler(desiredStatusSignal, (status: DesiredStatus) => {
desiredStatus = status
desiredStatusSignalCount++
})

// MARK: - State

function derivePipelineStatus(): PipelineStatus {
if (state.teardown) return 'teardown'
if (state.errored) return 'error'
if (state.paused) return 'paused'
if (state.setup !== 'completed') return 'setup'
return state.phase === 'ready' ? 'ready' : 'backfill'
Expand All @@ -77,7 +82,21 @@ export async function pipelineWorkflow(
* no longer active or because the workflow should roll over into continue-as-new.
*/
function runInterrupted() {
return desiredStatus !== 'active' || operationCount >= CONTINUE_AS_NEW_THRESHOLD
return (
desiredStatus !== 'active' || operationCount >= CONTINUE_AS_NEW_THRESHOLD || !!state.errored
)
}

async function markPermanentError(): Promise<void> {
await setState({ errored: true, phase: 'backfilling' })
}

async function waitForErrorRecovery(): Promise<void> {
const signalCount = desiredStatusSignalCount
await condition(() => desiredStatus === 'deleted' || desiredStatusSignalCount > signalCount)
if (desiredStatus === 'active') {
await setState({ errored: false, phase: 'backfilling' })
}
}

// MARK: - Live loop
Expand All @@ -97,8 +116,12 @@ export async function pipelineWorkflow(
const events = await waitForLiveEvents()
if (!events) return

await pipelineSync(pipelineId, { input: events })
const result = await pipelineSync(pipelineId, { input: events })
operationCount++
if (classifySyncErrors(result.errors).permanent.length > 0) {
await markPermanentError()
return
}
}
}

Expand Down Expand Up @@ -127,11 +150,15 @@ export async function pipelineWorkflow(
state_limit: 100,
time_limit: 10,
})
operationCount++
sourceState = result.state
if (result.eof?.reason === 'complete') {
if (classifySyncErrors(result.errors).permanent.length > 0) {
await markPermanentError()
return
}
if (result.eof?.reason === 'complete' && !state.errored) {
await setState({ phase: 'ready' })
}
operationCount++
}
}

Expand All @@ -144,6 +171,11 @@ export async function pipelineWorkflow(
}

while (desiredStatus !== 'deleted') {
if (state.errored) {
await waitForErrorRecovery()
continue
}

if (desiredStatus === 'paused') {
await setState({ paused: true })
await condition(() => desiredStatus !== 'paused')
Expand Down
6 changes: 3 additions & 3 deletions compose.dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

services:
engine:
image: sync-engine-engine:dev
build:
context: .
target: engine
Expand All @@ -21,6 +22,7 @@ services:
start_period: 10s

service:
image: sync-engine-service:dev
build:
context: .
target: service
Expand All @@ -47,9 +49,7 @@ services:
start_period: 10s

worker:
build:
context: .
target: service
image: sync-engine-service:dev
command:
- worker
- --temporal-address
Expand Down
Loading
Loading