Skip to content

Commit 93526ea

Browse files
authored
Stripe api test server & Tests & Bug Fixes (#245)
* test server * use hono, clean up api, add tests * add docker-backed test server harness * add all-api test server coverage, stripe failure regression tests * retry apis * disruption issue tests * fix false ready status on sync errors * clean up e2e docker containers and bound buildkit cache * format * derive list capabilities from OpenAPI spec instead of hardcoding & tests * typo * cleanup * exlcude new tests as well * don't run unrelated tests * don't test older than 2020 * Fix classifySyncErrors, remvoe backfill-pipeline-workflow.ts , cleanup, Revert unrelated dest-postgres changes * Replace stub object generation with OpenAPI schema-based generator and optimize test server * cleanup * move retry to stripe-source * don't exclude the tests * Fix error recovery race across continueAsNew * Emit auth_error for 401/403, keep system_error transient * Log transient errors suppressed by permanent failures * JSDoc, allow hyphens, add missing api_version * separate job for test server * start docker * reduce object size, pin version that support v2 in v2test * consolidate error
1 parent bec886c commit 93526ea

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

46 files changed

+5186
-77
lines changed

.github/workflows/ci.yml

Lines changed: 56 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -446,7 +446,13 @@ jobs:
446446
echo "::warning::E2E tests skipped — STRIPE_API_KEY not available (fork PR?)"
447447
exit 0
448448
fi
449-
pnpm --filter @stripe/sync-e2e exec vitest run --exclude 'service-docker.test.ts'
449+
pnpm --filter @stripe/sync-e2e exec vitest run \
450+
--exclude 'service-docker.test.ts' \
451+
--exclude 'test-server-all-api.test.ts' \
452+
--exclude 'test-server-sync.test.ts' \
453+
--exclude 'test-sync-e2e.test.ts' \
454+
--exclude 'test-sync-engine.test.ts' \
455+
--exclude 'test-e2e-network.test.ts' # ↑ run in e2e_test_server job
450456
env:
451457
STRIPE_API_KEY: ${{ secrets.STRIPE_API_KEY }}
452458
POSTGRES_URL: 'postgres://postgres:postgres@localhost:55432/postgres'
@@ -456,6 +462,54 @@ jobs:
456462
if: always()
457463
run: '[ -f /tmp/vitest-skip-warnings.txt ] && cat /tmp/vitest-skip-warnings.txt || true'
458464

465+
# ---------------------------------------------------------------------------
466+
# E2E Test Server — test-server suites that need Docker Compose infrastructure
467+
# ---------------------------------------------------------------------------
468+
e2e_test_server:
469+
name: E2E Test Server
470+
runs-on: ubuntu-24.04-arm
471+
472+
steps:
473+
- uses: actions/checkout@v5
474+
475+
- name: Install pnpm
476+
uses: pnpm/action-setup@v5
477+
478+
- name: Set up Node
479+
uses: actions/setup-node@v6
480+
with:
481+
node-version-file: ./.nvmrc
482+
cache: pnpm
483+
484+
- name: Install dependencies & build
485+
run: pnpm install --frozen-lockfile && pnpm build
486+
487+
- name: Start Docker Compose stack
488+
run: |
489+
docker compose -f compose.yml -f compose.dev.yml -f e2e/compose.e2e.yml \
490+
up --build -d --wait temporal engine service worker
491+
492+
- name: Test server suites (parallel)
493+
run: |
494+
pnpm --filter @stripe/sync-e2e exec vitest run \
495+
test-server-all-api.test.ts \
496+
test-server-sync.test.ts \
497+
test-sync-e2e.test.ts \
498+
test-sync-engine.test.ts
499+
env:
500+
SKIP_SETUP: '1'
501+
502+
- name: Network interruption tests (pauses containers)
503+
run: |
504+
pnpm --filter @stripe/sync-e2e exec vitest run \
505+
test-e2e-network.test.ts
506+
env:
507+
SKIP_SETUP: '1'
508+
509+
- name: Skipped test warnings
510+
if: always()
511+
run: '[ -f /tmp/vitest-skip-warnings.txt ] && cat /tmp/vitest-skip-warnings.txt || true'
512+
459513
# ---------------------------------------------------------------------------
460514
# E2E Service — service + worker Docker containers end-to-end (every push/PR)
461515
# ---------------------------------------------------------------------------
@@ -755,7 +809,7 @@ jobs:
755809
run: pnpm install --frozen-lockfile && pnpm build
756810

757811
- name: CDN e2e tests
758-
run: pnpm --filter @stripe/sync-e2e run test -- openapi-cdn.test.ts
812+
run: pnpm --filter @stripe/sync-e2e exec vitest run openapi-cdn.test.ts
759813
env:
760814
STRIPE_SPEC_CDN_BASE_URL: ${{ needs.docs.outputs.deployment_url }}/stripe-api-specs
761815

apps/service/src/__tests__/workflow.test.ts

Lines changed: 92 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,19 +8,23 @@ import { CONTINUE_AS_NEW_THRESHOLD } from '../lib/utils.js'
88

99
type SourceInput = unknown
1010

11-
// workflowsPath points to the compiled workflow directory.
12-
const workflowsPath = path.resolve(process.cwd(), 'dist/temporal/workflows')
11+
// Point directly at the workflow index to avoid resolving the legacy dist/temporal/workflows.js file.
12+
const workflowsPath = path.resolve(process.cwd(), 'dist/temporal/workflows/index.js')
1313

1414
const emptyState = { streams: {}, global: {} }
1515
const noErrors: RunResult = { errors: [], state: emptyState }
16+
const permanentSyncError: RunResult = {
17+
errors: [{ message: 'permanent sync failure', failure_type: 'auth_error', stream: 'customers' }],
18+
state: emptyState,
19+
}
1620

1721
// Workflows now receive only the pipelineId string
1822
const testPipelineId = 'test_pipe'
1923

2024
function stubActivities(overrides: Partial<SyncActivities> = {}): SyncActivities {
21-
return {
25+
const activities = {
2226
discoverCatalog: async () => ({ streams: [] }),
23-
pipelineSetup: async () => ({}),
27+
pipelineSetup: async () => {},
2428
pipelineSync: async () => noErrors,
2529
readIntoQueue: async () => ({ count: 0, state: emptyState }),
2630
writeGoogleSheetsFromQueue: async () => ({
@@ -33,6 +37,13 @@ function stubActivities(overrides: Partial<SyncActivities> = {}): SyncActivities
3337
updatePipelineStatus: async () => {},
3438
...overrides,
3539
}
40+
41+
return {
42+
...activities,
43+
setup: activities.pipelineSetup,
44+
sync: activities.pipelineSync,
45+
teardown: activities.pipelineTeardown,
46+
} as SyncActivities
3647
}
3748

3849
/** Signal the workflow to delete. */
@@ -69,7 +80,6 @@ describe('pipelineWorkflow (unit — stubbed activities)', () => {
6980
activities: stubActivities({
7081
pipelineSetup: async () => {
7182
setupCalled = true
72-
return {}
7383
},
7484
pipelineSync: async () => {
7585
runCallCount++
@@ -89,7 +99,7 @@ describe('pipelineWorkflow (unit — stubbed activities)', () => {
8999
await new Promise((r) => setTimeout(r, 2000))
90100

91101
const status = await handle.query('status')
92-
expect(status.iteration).toBeGreaterThan(0)
102+
expect((status as { iteration: number }).iteration).toBeGreaterThan(0)
93103

94104
await signalDelete(handle)
95105
await handle.result()
@@ -334,6 +344,82 @@ describe('pipelineWorkflow (unit — stubbed activities)', () => {
334344
})
335345
})
336346

347+
it('transitions to error instead of ready when reconcile returns permanent sync errors', async () => {
348+
const statusWrites: string[] = []
349+
350+
const worker = await Worker.create({
351+
connection: testEnv.nativeConnection,
352+
taskQueue: 'test-queue-3b-error',
353+
workflowsPath,
354+
activities: stubActivities({
355+
updatePipelineStatus: async (_id: string, status: string) => {
356+
statusWrites.push(status)
357+
},
358+
pipelineSync: async (_pipelineId: string, opts?) => {
359+
if (opts?.input) return noErrors
360+
return { ...permanentSyncError, eof: { reason: 'complete' as const } }
361+
},
362+
}),
363+
})
364+
365+
await worker.runUntil(async () => {
366+
const handle = await testEnv.client.workflow.start('pipelineWorkflow', {
367+
args: [testPipelineId],
368+
workflowId: 'test-sync-3b-error',
369+
taskQueue: 'test-queue-3b-error',
370+
})
371+
372+
await new Promise((r) => setTimeout(r, 500))
373+
await signalDelete(handle)
374+
await handle.result()
375+
376+
expect(statusWrites).toContain('error')
377+
expect(statusWrites).not.toContain('ready')
378+
})
379+
})
380+
381+
it('retries transient sync activity failures and still reaches ready', async () => {
382+
const statusWrites: string[] = []
383+
let reconcileCalls = 0
384+
385+
const worker = await Worker.create({
386+
connection: testEnv.nativeConnection,
387+
taskQueue: 'test-queue-3b-retry',
388+
workflowsPath,
389+
activities: stubActivities({
390+
updatePipelineStatus: async (_id: string, status: string) => {
391+
statusWrites.push(status)
392+
},
393+
pipelineSync: async (_pipelineId: string, opts?) => {
394+
if (opts?.input) return noErrors
395+
396+
reconcileCalls++
397+
if (reconcileCalls === 1) {
398+
throw new Error('transient sync failure')
399+
}
400+
401+
return { ...noErrors, eof: { reason: 'complete' as const } }
402+
},
403+
}),
404+
})
405+
406+
await worker.runUntil(async () => {
407+
const handle = await testEnv.client.workflow.start('pipelineWorkflow', {
408+
args: [testPipelineId],
409+
workflowId: 'test-sync-3b-retry',
410+
taskQueue: 'test-queue-3b-retry',
411+
})
412+
413+
await new Promise((r) => setTimeout(r, 2500))
414+
await signalDelete(handle)
415+
await handle.result()
416+
417+
expect(reconcileCalls).toBeGreaterThanOrEqual(2)
418+
expect(statusWrites).toContain('ready')
419+
expect(statusWrites).not.toContain('error')
420+
})
421+
})
422+
337423
it('queues live events while paused and drains them after resume', async () => {
338424
const syncCalls: { input?: SourceInput[] }[] = []
339425

@@ -464,7 +550,6 @@ describe('pipelineWorkflow (unit — stubbed activities)', () => {
464550
activities: stubActivities({
465551
pipelineSetup: async () => {
466552
setupCalls++
467-
return {}
468553
},
469554
pipelineSync: async () => {
470555
syncCallCount++

apps/service/src/temporal/activities/_shared.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import { createRemoteEngine } from '@stripe/sync-engine'
44
import { Kafka } from 'kafkajs'
55
import type { Producer } from 'kafkajs'
66
import type { PipelineStore } from '../../lib/stores.js'
7+
import type { SyncRunError } from '../sync-errors.js'
78

89
export interface ActivitiesContext {
910
/** Remote engine client — satisfies the {@link Engine} interface over HTTP. Drop-in replacement for a local engine. */
@@ -104,7 +105,7 @@ export function createActivitiesContext(opts: {
104105
}
105106

106107
export interface RunResult {
107-
errors: Array<{ message: string; failure_type?: string; stream?: string }>
108+
errors: SyncRunError[]
108109
state: SourceState
109110
}
110111

apps/service/src/temporal/activities/pipeline-sync.ts

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
1+
import { ApplicationFailure } from '@temporalio/activity'
12
import type { SourceInputMessage, SourceReadOptions } from '@stripe/sync-engine'
23
import type { ActivitiesContext } from './_shared.js'
34
import { asIterable, drainMessages, type RunResult } from './_shared.js'
5+
import { classifySyncErrors, summarizeSyncErrors } from '../sync-errors.js'
46

57
export function createPipelineSyncActivity(context: ActivitiesContext) {
68
return async function pipelineSync(
@@ -28,6 +30,18 @@ export function createPipelineSyncActivity(context: ActivitiesContext) {
2830
destination: { type, [type]: destConfig },
2931
})
3032
}
33+
const { transient, permanent } = classifySyncErrors(errors)
34+
if (permanent.length > 0) {
35+
if (transient.length > 0) {
36+
console.warn(
37+
`Transient errors suppressed by permanent failures: ${summarizeSyncErrors(transient)}`
38+
)
39+
}
40+
return { errors, state, eof }
41+
}
42+
if (transient.length > 0) {
43+
throw ApplicationFailure.retryable(summarizeSyncErrors(transient), 'TransientSyncError')
44+
}
3145
return { errors, state, eof }
3246
}
3347
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
export type SyncRunError = {
2+
message: string
3+
failure_type?: string
4+
stream?: string
5+
}
6+
7+
export type ClassifiedSyncErrors = {
8+
transient: SyncRunError[]
9+
permanent: SyncRunError[]
10+
}
11+
12+
const PERMANENT_FAILURE_TYPES = new Set(['config_error', 'auth_error'])
13+
14+
export function classifySyncErrors(errors: SyncRunError[]): ClassifiedSyncErrors {
15+
const transient: SyncRunError[] = []
16+
const permanent: SyncRunError[] = []
17+
18+
for (const error of errors) {
19+
if (PERMANENT_FAILURE_TYPES.has(error.failure_type ?? '')) {
20+
permanent.push(error)
21+
} else {
22+
transient.push(error)
23+
}
24+
}
25+
26+
return { transient, permanent }
27+
}
28+
29+
export function summarizeSyncErrors(errors: SyncRunError[]): string {
30+
return errors
31+
.map((error) => {
32+
const failureType = error.failure_type ?? 'unknown_error'
33+
const stream = error.stream ? `/${error.stream}` : ''
34+
return `[${failureType}${stream}] ${error.message}`
35+
})
36+
.join('; ')
37+
}

0 commit comments

Comments
 (0)