Skip to content

Commit 22595e2

Browse files
sgresh-stripetonyxiao
authored andcommitted
sheets
Committed-By-Agent: claude
1 parent 51cfcbb commit 22595e2

File tree

17 files changed

+939
-48
lines changed

17 files changed

+939
-48
lines changed

apps/engine/src/__generated__/openapi.json

Lines changed: 5 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

apps/engine/src/api/app.ts

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ import {
1919
import { takeStateCheckpoints } from '../lib/pipeline.js'
2020
import { ndjsonResponse } from '@stripe/sync-ts-cli/ndjson'
2121
import { logger } from '../logger.js'
22+
import { fetchById } from '@stripe/sync-source-stripe'
23+
import type { Config as StripeConfig } from '@stripe/sync-source-stripe'
2224
import {
2325
sslConfigFromConnectionString,
2426
stripSslParams,
@@ -407,6 +409,93 @@ export function createApp(resolver: ConnectorResolver) {
407409
}) as any
408410
)
409411

412+
const ThinEventSchema = z.array(
413+
z.object({
414+
stream: z.string(),
415+
id: z.string(),
416+
row_number: z.number().int().positive().optional(),
417+
})
418+
)
419+
420+
app.openapi(
421+
createRoute({
422+
operationId: 'writeEvents',
423+
method: 'post',
424+
path: '/write-events',
425+
tags: ['Stateless Sync API'],
426+
summary: 'Write thin events to destination',
427+
description:
428+
'Accepts an array of {stream, id, row_number?} objects, fetches full Stripe objects, and writes them to the destination.',
429+
requestParams: { header: pipelineHeaders },
430+
requestBody: {
431+
required: true,
432+
content: { 'application/json': { schema: ThinEventSchema } },
433+
},
434+
responses: {
435+
200: {
436+
description: 'NDJSON stream of write result messages',
437+
content: { 'application/x-ndjson': { schema: DestinationOutputSchema } },
438+
},
439+
400: errorResponse,
440+
},
441+
}),
442+
// eslint-disable-next-line @typescript-eslint/no-explicit-any
443+
(async (c: any) => {
444+
const params = parseSyncParams(c)
445+
const context = { path: '/write-events', ...syncRequestContext(params) }
446+
const startedAt = Date.now()
447+
448+
let events: z.infer<typeof ThinEventSchema>
449+
try {
450+
events = ThinEventSchema.parse(await c.req.json())
451+
} catch {
452+
return c.json({ error: 'Body must be an array of {stream, id, row_number?}' }, 400)
453+
}
454+
if (events.length === 0) {
455+
return c.json({ error: 'Events array must not be empty' }, 400)
456+
}
457+
458+
logger.info({ ...context, eventCount: events.length }, 'Engine API /write-events started')
459+
460+
const sourceConfig = params.pipeline.source as unknown as StripeConfig
461+
const now = Date.now()
462+
const fetched = await Promise.all(
463+
events.map(async (event) => {
464+
const data = await fetchById(sourceConfig, event.stream, event.id)
465+
return { event, data }
466+
})
467+
)
468+
469+
for (const { event, data } of fetched) {
470+
if (!data) {
471+
logger.warn(
472+
{ stream: event.stream, id: event.id },
473+
'Engine API /write-events: fetchById returned null — skipping'
474+
)
475+
}
476+
}
477+
478+
const records = fetched.filter(({ data }) => data != null)
479+
480+
async function* recordMessages(): AsyncIterable<import('@stripe/sync-protocol').DestinationInput> {
481+
for (const { event, data } of records) {
482+
yield {
483+
type: 'record' as const,
484+
stream: event.stream,
485+
data: data!,
486+
emitted_at: now,
487+
row_number: event.row_number,
488+
}
489+
}
490+
}
491+
492+
const engine = await createEngineFromParams(params.pipeline, resolver, readonlyStateStore())
493+
return ndjsonResponse(
494+
logApiStream('Engine API /write-events', engine.write(recordMessages()), context, startedAt)
495+
)
496+
}) as any
497+
)
498+
410499
app.openapi(
411500
createRoute({
412501
operationId: 'sync',

apps/engine/src/lib/engine.ts

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,25 @@ export function createEngine(
167167
return _catalog
168168
}
169169

170+
async function setupSource(catalog: ConfiguredCatalog): Promise<void> {
171+
if (connectors.source.setup) {
172+
await withLoggedStep('Engine source setup', baseContext, () =>
173+
connectors.source.setup!({ config: sourceConfig, catalog })
174+
)
175+
}
176+
}
177+
178+
async function setupDestination(
179+
catalog: ConfiguredCatalog
180+
): Promise<Record<string, unknown> | undefined> {
181+
if (connectors.destination.setup) {
182+
const result = await withLoggedStep('Engine destination setup', baseContext, () =>
183+
connectors.destination.setup!({ config: destConfig, catalog })
184+
)
185+
return result ?? undefined
186+
}
187+
}
188+
170189
return {
171190
async setup() {
172191
const catalog = await getCatalog()
@@ -252,7 +271,14 @@ export function createEngine(
252271
},
253272

254273
async *sync(input?: AsyncIterable<unknown>) {
255-
await this.setup()
274+
if (connectors.destination.skipAutoSetup) {
275+
// Destination manages its own setup externally (e.g. via /setup endpoint).
276+
// Only set up the source here.
277+
const catalog = await getCatalog()
278+
await setupSource(catalog)
279+
} else {
280+
await this.setup()
281+
}
256282
yield* pipe(this.read(input), this.write, persistState(stateStore))
257283
},
258284
}

packages/dashboard/.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
.next/
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
scripts/.state.json

packages/destination-google-sheets/__tests__/memory-sheets.ts

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -76,17 +76,19 @@ export function createMemorySheets() {
7676
const ss = getSpreadsheet(params.spreadsheetId)
7777
const requests = (params.requestBody?.requests ?? []) as Record<string, unknown>[]
7878

79+
const replies: unknown[] = []
80+
7981
for (const req of requests) {
8082
if (req.addSheet) {
8183
const props = (req.addSheet as { properties?: { title?: string } }).properties
8284
const name = props?.title ?? `Sheet${ss.sheets.size + 1}`
8385
if (ss.sheets.has(name)) {
8486
throw Object.assign(new Error(`Sheet already exists: ${name}`), { code: 400 })
8587
}
86-
ss.sheets.set(name, { sheetId: nextSheetId++, values: [] })
87-
}
88-
89-
if (req.updateSheetProperties) {
88+
const sheetId = nextSheetId++
89+
ss.sheets.set(name, { sheetId, values: [] })
90+
replies.push({ addSheet: { properties: { sheetId, title: name } } })
91+
} else if (req.updateSheetProperties) {
9092
const update = req.updateSheetProperties as {
9193
properties: { sheetId: number; title: string }
9294
fields: string
@@ -99,10 +101,13 @@ export function createMemorySheets() {
99101
break
100102
}
101103
}
104+
replies.push({})
105+
} else {
106+
replies.push({})
102107
}
103108
}
104109

105-
return { data: {} }
110+
return { data: { replies } }
106111
},
107112

108113
values: {
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
// Shared helpers for the destination-google-sheets scripts.
2+
// Loads .env and manages a local .state.json that acts as a fake DB for the sheet ID.
3+
4+
import { readFileSync, writeFileSync, unlinkSync } from 'node:fs'
5+
import { resolve, dirname } from 'node:path'
6+
import { fileURLToPath } from 'node:url'
7+
8+
const __dirname = dirname(fileURLToPath(import.meta.url))
9+
const STATE_FILE = resolve(__dirname, '.state.json')
10+
11+
// ── Env loading ──────────────────────────────────────────────────────────────
12+
13+
export function loadEnv(): void {
14+
const envPath = resolve(__dirname, '../.env')
15+
try {
16+
const content = readFileSync(envPath, 'utf-8')
17+
for (const line of content.split('\n')) {
18+
const trimmed = line.trim()
19+
if (!trimmed || trimmed.startsWith('#')) continue
20+
const eqIdx = trimmed.indexOf('=')
21+
if (eqIdx === -1) continue
22+
const key = trimmed.slice(0, eqIdx).trim()
23+
const value = trimmed.slice(eqIdx + 1).trim()
24+
if (!(key in process.env)) process.env[key] = value
25+
}
26+
} catch {
27+
// .env is optional
28+
}
29+
}
30+
31+
// ── Sheet state ───────────────────────────────────────────────────────────────
32+
33+
export interface SheetState {
34+
spreadsheet_id: string
35+
/** Per-stream cursor state, persisted across sync calls for resumable pagination. */
36+
sync_state?: Record<string, unknown>
37+
}
38+
39+
export function loadState(): SheetState | null {
40+
try {
41+
return JSON.parse(readFileSync(STATE_FILE, 'utf-8')) as SheetState
42+
} catch {
43+
return null
44+
}
45+
}
46+
47+
export function saveState(state: SheetState): void {
48+
writeFileSync(STATE_FILE, JSON.stringify(state, null, 2) + '\n')
49+
console.error(`Saved state → ${STATE_FILE}`)
50+
}
51+
52+
export function clearState(): void {
53+
try {
54+
unlinkSync(STATE_FILE)
55+
console.error(`Cleared state (${STATE_FILE})`)
56+
} catch {
57+
// already gone
58+
}
59+
}
60+
61+
// ── Pipeline builder ──────────────────────────────────────────────────────────
62+
63+
export function buildDestinationConfig(spreadsheetId?: string): Record<string, unknown> {
64+
return {
65+
name: 'google-sheets',
66+
client_id: process.env['GOOGLE_CLIENT_ID'],
67+
client_secret: process.env['GOOGLE_CLIENT_SECRET'],
68+
access_token: 'unused',
69+
refresh_token: process.env['GOOGLE_REFRESH_TOKEN'],
70+
...(spreadsheetId ? { spreadsheet_id: spreadsheetId } : {}),
71+
}
72+
}
73+
74+
export const STREAMS = ['products', 'customers', 'prices', 'subscriptions'] as const
75+
76+
export function buildPipeline(spreadsheetId?: string): Record<string, unknown> {
77+
return {
78+
source: { name: 'stripe', api_key: process.env['STRIPE_API_KEY'], backfill_limit: 10 },
79+
destination: buildDestinationConfig(spreadsheetId),
80+
streams: STREAMS.map((name) => ({ name })),
81+
}
82+
}
83+
84+
export function requireEnv(...keys: string[]): void {
85+
const missing = keys.filter((k) => !process.env[k])
86+
if (missing.length > 0) {
87+
console.error(`Error: missing required env vars: ${missing.join(', ')}`)
88+
process.exit(1)
89+
}
90+
}
91+
92+
export function getPort(): string {
93+
const idx = process.argv.indexOf('--port')
94+
return idx !== -1 ? process.argv[idx + 1] : '3000'
95+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
#!/usr/bin/env node
2+
// GET /check — validates credentials and sheet accessibility
3+
// Usage: npx tsx scripts/check-via-server.ts [--port 3000]
4+
5+
import { loadEnv, buildPipeline, requireEnv, loadState, getPort } from './_state.js'
6+
7+
loadEnv()
8+
requireEnv('STRIPE_API_KEY', 'GOOGLE_CLIENT_ID', 'GOOGLE_CLIENT_SECRET', 'GOOGLE_REFRESH_TOKEN')
9+
10+
const state = loadState()
11+
if (!state) {
12+
console.error('No sheet state found — run setup-via-server.ts first')
13+
process.exit(1)
14+
}
15+
16+
const serverUrl = `http://localhost:${getPort()}`
17+
const pipeline = buildPipeline(state.spreadsheet_id)
18+
19+
console.error(`Hitting ${serverUrl}/check ...`)
20+
console.error(`Sheet: https://docs.google.com/spreadsheets/d/${state.spreadsheet_id}`)
21+
22+
const res = await fetch(`${serverUrl}/check`, {
23+
headers: { 'X-Pipeline': JSON.stringify(pipeline) },
24+
})
25+
26+
const result = await res.json()
27+
console.log(JSON.stringify(result, null, 2))
28+
29+
if (res.status !== 200) process.exit(1)
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
#!/usr/bin/env node
2+
// POST /setup — creates a new Google Sheet, saves its ID to .state.json
3+
// Usage: npx tsx scripts/setup-via-server.ts [--port 3000]
4+
5+
import { loadEnv, buildPipeline, requireEnv, saveState, getPort } from './_state.js'
6+
7+
loadEnv()
8+
requireEnv('STRIPE_API_KEY', 'GOOGLE_CLIENT_ID', 'GOOGLE_CLIENT_SECRET', 'GOOGLE_REFRESH_TOKEN')
9+
10+
const serverUrl = `http://localhost:${getPort()}`
11+
12+
// No spreadsheet_id — setup always creates a new sheet
13+
const pipeline = buildPipeline()
14+
15+
console.error(`Hitting ${serverUrl}/setup ...`)
16+
17+
const res = await fetch(`${serverUrl}/setup`, {
18+
method: 'POST',
19+
headers: { 'X-Pipeline': JSON.stringify(pipeline) },
20+
})
21+
22+
if (res.status === 200) {
23+
const result = (await res.json()) as { spreadsheet_id: string }
24+
saveState({ spreadsheet_id: result.spreadsheet_id })
25+
console.log(JSON.stringify(result, null, 2))
26+
} else {
27+
const body = await res.text()
28+
console.error(`Error: ${res.status} ${res.statusText}`)
29+
if (body) console.error(body)
30+
process.exit(1)
31+
}

0 commit comments

Comments
 (0)