diff --git a/apps/engine/src/__generated__/openapi.json b/apps/engine/src/__generated__/openapi.json index 81022c93..2ba5a139 100644 --- a/apps/engine/src/__generated__/openapi.json +++ b/apps/engine/src/__generated__/openapi.json @@ -586,6 +586,11 @@ }, "emitted_at": { "type": "number" + }, + "row_number": { + "type": "integer", + "exclusiveMinimum": 0, + "maximum": 9007199254740991 } }, "required": ["type", "stream", "data", "emitted_at"] diff --git a/apps/engine/src/api/app.ts b/apps/engine/src/api/app.ts index 4c11128b..ad31e4c4 100644 --- a/apps/engine/src/api/app.ts +++ b/apps/engine/src/api/app.ts @@ -19,6 +19,8 @@ import { import { takeStateCheckpoints } from '../lib/pipeline.js' import { ndjsonResponse } from '@stripe/sync-ts-cli/ndjson' import { logger } from '../logger.js' +import { fetchById } from '@stripe/sync-source-stripe' +import type { Config as StripeConfig } from '@stripe/sync-source-stripe' import { sslConfigFromConnectionString, stripSslParams, @@ -407,6 +409,93 @@ export function createApp(resolver: ConnectorResolver) { }) as any ) + const ThinEventSchema = z.array( + z.object({ + stream: z.string(), + id: z.string(), + row_number: z.number().int().positive().optional(), + }) + ) + + app.openapi( + createRoute({ + operationId: 'writeEvents', + method: 'post', + path: '/write-events', + tags: ['Stateless Sync API'], + summary: 'Write thin events to destination', + description: + 'Accepts an array of {stream, id, row_number?} objects, fetches full Stripe objects, and writes them to the destination.', + requestParams: { header: pipelineHeaders }, + requestBody: { + required: true, + content: { 'application/json': { schema: ThinEventSchema } }, + }, + responses: { + 200: { + description: 'NDJSON stream of write result messages', + content: { 'application/x-ndjson': { schema: DestinationOutputSchema } }, + }, + 400: errorResponse, + }, + }), + // eslint-disable-next-line @typescript-eslint/no-explicit-any + (async (c: any) => { + const params = parseSyncParams(c) + const context = { path: '/write-events', ...syncRequestContext(params) } + const startedAt = Date.now() + + let events: z.infer + try { + events = ThinEventSchema.parse(await c.req.json()) + } catch { + return c.json({ error: 'Body must be an array of {stream, id, row_number?}' }, 400) + } + if (events.length === 0) { + return c.json({ error: 'Events array must not be empty' }, 400) + } + + logger.info({ ...context, eventCount: events.length }, 'Engine API /write-events started') + + const sourceConfig = params.pipeline.source as unknown as StripeConfig + const now = Date.now() + const fetched = await Promise.all( + events.map(async (event) => { + const data = await fetchById(sourceConfig, event.stream, event.id) + return { event, data } + }) + ) + + for (const { event, data } of fetched) { + if (!data) { + logger.warn( + { stream: event.stream, id: event.id }, + 'Engine API /write-events: fetchById returned null — skipping' + ) + } + } + + const records = fetched.filter(({ data }) => data != null) + + async function* recordMessages(): AsyncIterable { + for (const { event, data } of records) { + yield { + type: 'record' as const, + stream: event.stream, + data: data!, + emitted_at: now, + row_number: event.row_number, + } + } + } + + const engine = await createEngineFromParams(params.pipeline, resolver, readonlyStateStore()) + return ndjsonResponse( + logApiStream('Engine API /write-events', engine.write(recordMessages()), context, startedAt) + ) + }) as any + ) + app.openapi( createRoute({ operationId: 'sync', diff --git a/apps/engine/src/lib/engine.ts b/apps/engine/src/lib/engine.ts index 00766247..cc996c97 100644 --- a/apps/engine/src/lib/engine.ts +++ b/apps/engine/src/lib/engine.ts @@ -167,6 +167,25 @@ export function createEngine( return _catalog } + async function setupSource(catalog: ConfiguredCatalog): Promise { + if (connectors.source.setup) { + await withLoggedStep('Engine source setup', baseContext, () => + connectors.source.setup!({ config: sourceConfig, catalog }) + ) + } + } + + async function setupDestination( + catalog: ConfiguredCatalog + ): Promise | undefined> { + if (connectors.destination.setup) { + const result = await withLoggedStep('Engine destination setup', baseContext, () => + connectors.destination.setup!({ config: destConfig, catalog }) + ) + return result ?? undefined + } + } + return { async setup() { const catalog = await getCatalog() @@ -252,7 +271,14 @@ export function createEngine( }, async *sync(input?: AsyncIterable) { - await this.setup() + if (connectors.destination.skipAutoSetup) { + // Destination manages its own setup externally (e.g. via /setup endpoint). + // Only set up the source here. + const catalog = await getCatalog() + await setupSource(catalog) + } else { + await this.setup() + } yield* pipe(this.read(input), this.write, persistState(stateStore)) }, } diff --git a/packages/dashboard/.gitignore b/packages/dashboard/.gitignore new file mode 100644 index 00000000..536d88c8 --- /dev/null +++ b/packages/dashboard/.gitignore @@ -0,0 +1 @@ +.next/ diff --git a/packages/destination-google-sheets/.gitignore b/packages/destination-google-sheets/.gitignore new file mode 100644 index 00000000..d1b87e4a --- /dev/null +++ b/packages/destination-google-sheets/.gitignore @@ -0,0 +1 @@ +scripts/.state.json diff --git a/packages/destination-google-sheets/__tests__/memory-sheets.ts b/packages/destination-google-sheets/__tests__/memory-sheets.ts index 799c103b..051ab270 100644 --- a/packages/destination-google-sheets/__tests__/memory-sheets.ts +++ b/packages/destination-google-sheets/__tests__/memory-sheets.ts @@ -76,6 +76,8 @@ export function createMemorySheets() { const ss = getSpreadsheet(params.spreadsheetId) const requests = (params.requestBody?.requests ?? []) as Record[] + const replies: unknown[] = [] + for (const req of requests) { if (req.addSheet) { const props = (req.addSheet as { properties?: { title?: string } }).properties @@ -83,10 +85,10 @@ export function createMemorySheets() { if (ss.sheets.has(name)) { throw Object.assign(new Error(`Sheet already exists: ${name}`), { code: 400 }) } - ss.sheets.set(name, { sheetId: nextSheetId++, values: [] }) - } - - if (req.updateSheetProperties) { + const sheetId = nextSheetId++ + ss.sheets.set(name, { sheetId, values: [] }) + replies.push({ addSheet: { properties: { sheetId, title: name } } }) + } else if (req.updateSheetProperties) { const update = req.updateSheetProperties as { properties: { sheetId: number; title: string } fields: string @@ -99,10 +101,13 @@ export function createMemorySheets() { break } } + replies.push({}) + } else { + replies.push({}) } } - return { data: {} } + return { data: { replies } } }, values: { diff --git a/packages/destination-google-sheets/package.json b/packages/destination-google-sheets/package.json index 69be2e6d..d81adbda 100644 --- a/packages/destination-google-sheets/package.json +++ b/packages/destination-google-sheets/package.json @@ -27,6 +27,7 @@ "zod": "^4.3.6" }, "devDependencies": { + "@types/node": "^25.5.0", "vitest": "^3.2.4" } } diff --git a/packages/destination-google-sheets/scripts/_state.ts b/packages/destination-google-sheets/scripts/_state.ts new file mode 100644 index 00000000..2fcd487f --- /dev/null +++ b/packages/destination-google-sheets/scripts/_state.ts @@ -0,0 +1,95 @@ +// Shared helpers for the destination-google-sheets scripts. +// Loads .env and manages a local .state.json that acts as a fake DB for the sheet ID. + +import { readFileSync, writeFileSync, unlinkSync } from 'node:fs' +import { resolve, dirname } from 'node:path' +import { fileURLToPath } from 'node:url' + +const __dirname = dirname(fileURLToPath(import.meta.url)) +const STATE_FILE = resolve(__dirname, '.state.json') + +// ── Env loading ────────────────────────────────────────────────────────────── + +export function loadEnv(): void { + const envPath = resolve(__dirname, '../.env') + try { + const content = readFileSync(envPath, 'utf-8') + for (const line of content.split('\n')) { + const trimmed = line.trim() + if (!trimmed || trimmed.startsWith('#')) continue + const eqIdx = trimmed.indexOf('=') + if (eqIdx === -1) continue + const key = trimmed.slice(0, eqIdx).trim() + const value = trimmed.slice(eqIdx + 1).trim() + if (!(key in process.env)) process.env[key] = value + } + } catch { + // .env is optional + } +} + +// ── Sheet state ─────────────────────────────────────────────────────────────── + +export interface SheetState { + spreadsheet_id: string + /** Per-stream cursor state, persisted across sync calls for resumable pagination. */ + sync_state?: Record +} + +export function loadState(): SheetState | null { + try { + return JSON.parse(readFileSync(STATE_FILE, 'utf-8')) as SheetState + } catch { + return null + } +} + +export function saveState(state: SheetState): void { + writeFileSync(STATE_FILE, JSON.stringify(state, null, 2) + '\n') + console.error(`Saved state → ${STATE_FILE}`) +} + +export function clearState(): void { + try { + unlinkSync(STATE_FILE) + console.error(`Cleared state (${STATE_FILE})`) + } catch { + // already gone + } +} + +// ── Pipeline builder ────────────────────────────────────────────────────────── + +export function buildDestinationConfig(spreadsheetId?: string): Record { + return { + name: 'google-sheets', + client_id: process.env['GOOGLE_CLIENT_ID'], + client_secret: process.env['GOOGLE_CLIENT_SECRET'], + access_token: 'unused', + refresh_token: process.env['GOOGLE_REFRESH_TOKEN'], + ...(spreadsheetId ? { spreadsheet_id: spreadsheetId } : {}), + } +} + +export const STREAMS = ['products', 'customers', 'prices', 'subscriptions'] as const + +export function buildPipeline(spreadsheetId?: string): Record { + return { + source: { name: 'stripe', api_key: process.env['STRIPE_API_KEY'], backfill_limit: 10 }, + destination: buildDestinationConfig(spreadsheetId), + streams: STREAMS.map((name) => ({ name })), + } +} + +export function requireEnv(...keys: string[]): void { + const missing = keys.filter((k) => !process.env[k]) + if (missing.length > 0) { + console.error(`Error: missing required env vars: ${missing.join(', ')}`) + process.exit(1) + } +} + +export function getPort(): string { + const idx = process.argv.indexOf('--port') + return idx !== -1 ? process.argv[idx + 1] : '3000' +} diff --git a/packages/destination-google-sheets/scripts/check-via-server.ts b/packages/destination-google-sheets/scripts/check-via-server.ts new file mode 100644 index 00000000..82de3458 --- /dev/null +++ b/packages/destination-google-sheets/scripts/check-via-server.ts @@ -0,0 +1,29 @@ +#!/usr/bin/env node +// GET /check — validates credentials and sheet accessibility +// Usage: npx tsx scripts/check-via-server.ts [--port 3000] + +import { loadEnv, buildPipeline, requireEnv, loadState, getPort } from './_state.js' + +loadEnv() +requireEnv('STRIPE_API_KEY', 'GOOGLE_CLIENT_ID', 'GOOGLE_CLIENT_SECRET', 'GOOGLE_REFRESH_TOKEN') + +const state = loadState() +if (!state) { + console.error('No sheet state found — run setup-via-server.ts first') + process.exit(1) +} + +const serverUrl = `http://localhost:${getPort()}` +const pipeline = buildPipeline(state.spreadsheet_id) + +console.error(`Hitting ${serverUrl}/check ...`) +console.error(`Sheet: https://docs.google.com/spreadsheets/d/${state.spreadsheet_id}`) + +const res = await fetch(`${serverUrl}/check`, { + headers: { 'X-Pipeline': JSON.stringify(pipeline) }, +}) + +const result = await res.json() +console.log(JSON.stringify(result, null, 2)) + +if (res.status !== 200) process.exit(1) diff --git a/packages/destination-google-sheets/scripts/setup-via-server.ts b/packages/destination-google-sheets/scripts/setup-via-server.ts new file mode 100644 index 00000000..f1dd3bbf --- /dev/null +++ b/packages/destination-google-sheets/scripts/setup-via-server.ts @@ -0,0 +1,31 @@ +#!/usr/bin/env node +// POST /setup — creates a new Google Sheet, saves its ID to .state.json +// Usage: npx tsx scripts/setup-via-server.ts [--port 3000] + +import { loadEnv, buildPipeline, requireEnv, saveState, getPort } from './_state.js' + +loadEnv() +requireEnv('STRIPE_API_KEY', 'GOOGLE_CLIENT_ID', 'GOOGLE_CLIENT_SECRET', 'GOOGLE_REFRESH_TOKEN') + +const serverUrl = `http://localhost:${getPort()}` + +// No spreadsheet_id — setup always creates a new sheet +const pipeline = buildPipeline() + +console.error(`Hitting ${serverUrl}/setup ...`) + +const res = await fetch(`${serverUrl}/setup`, { + method: 'POST', + headers: { 'X-Pipeline': JSON.stringify(pipeline) }, +}) + +if (res.status === 200) { + const result = (await res.json()) as { spreadsheet_id: string } + saveState({ spreadsheet_id: result.spreadsheet_id }) + console.log(JSON.stringify(result, null, 2)) +} else { + const body = await res.text() + console.error(`Error: ${res.status} ${res.statusText}`) + if (body) console.error(body) + process.exit(1) +} diff --git a/packages/destination-google-sheets/scripts/sheet-size.ts b/packages/destination-google-sheets/scripts/sheet-size.ts new file mode 100644 index 00000000..e15ccbe1 --- /dev/null +++ b/packages/destination-google-sheets/scripts/sheet-size.ts @@ -0,0 +1,66 @@ +#!/usr/bin/env node +// Calculates total cell count across all sheets in the saved spreadsheet. +// +// Usage: npx tsx scripts/sheet-size.ts + +import { readFileSync } from 'node:fs' +import { resolve, dirname } from 'node:path' +import { fileURLToPath } from 'node:url' +import { google } from 'googleapis' + +const __dirname = dirname(fileURLToPath(import.meta.url)) + +// Load .env +const envPath = resolve(__dirname, '../.env') +try { + for (const line of readFileSync(envPath, 'utf-8').split('\n')) { + const trimmed = line.trim() + if (!trimmed || trimmed.startsWith('#')) continue + const eqIdx = trimmed.indexOf('=') + if (eqIdx === -1) continue + const key = trimmed.slice(0, eqIdx).trim() + const value = trimmed.slice(eqIdx + 1).trim() + if (!(key in process.env)) process.env[key] = value + } +} catch { + /* .env is optional */ +} + +// Load spreadsheet ID from .state.json +const stateFile = resolve(__dirname, '.state.json') +let spreadsheetId: string +try { + const state = JSON.parse(readFileSync(stateFile, 'utf-8')) as { spreadsheet_id: string } + spreadsheetId = state.spreadsheet_id +} catch { + console.error('No .state.json found — run setup-via-server.ts first') + process.exit(1) +} + +const auth = new google.auth.OAuth2( + process.env['GOOGLE_CLIENT_ID'], + process.env['GOOGLE_CLIENT_SECRET'] +) +auth.setCredentials({ refresh_token: process.env['GOOGLE_REFRESH_TOKEN'] }) +const sheets = google.sheets({ version: 'v4', auth }) + +// Fetch spreadsheet metadata (includes all sheet grid properties) +const res = await sheets.spreadsheets.get({ + spreadsheetId, + fields: 'sheets(properties(title,gridProperties))', +}) + +console.error(`Sheet: https://docs.google.com/spreadsheets/d/${spreadsheetId}\n`) + +let grandTotal = 0 +for (const sheet of res.data.sheets ?? []) { + const title = sheet.properties?.title ?? '(untitled)' + const { rowCount = 0, columnCount = 0 } = sheet.properties?.gridProperties ?? {} + const cells = rowCount * columnCount + grandTotal += cells + console.error( + ` ${title}: ${rowCount} rows × ${columnCount} cols = ${cells.toLocaleString()} cells` + ) +} + +console.error(`\n Total: ${grandTotal.toLocaleString()} cells`) diff --git a/packages/destination-google-sheets/scripts/stripe-to-google-sheets.ts b/packages/destination-google-sheets/scripts/stripe-to-google-sheets.ts new file mode 100644 index 00000000..4144e9e9 --- /dev/null +++ b/packages/destination-google-sheets/scripts/stripe-to-google-sheets.ts @@ -0,0 +1,89 @@ +#!/usr/bin/env node +// Sync Stripe → Google Sheets via the sync-engine CLI. +// Reads credentials from packages/destination-google-sheets/.env +// +// Usage: npx tsx scripts/stripe-to-google-sheets.ts +// or: node --import tsx scripts/stripe-to-google-sheets.ts + +import { readFileSync } from 'node:fs' +import { resolve, dirname } from 'node:path' +import { fileURLToPath } from 'node:url' +import { execFileSync, spawnSync } from 'node:child_process' + +const __dirname = dirname(fileURLToPath(import.meta.url)) + +// Load .env from the package root +const envPath = resolve(__dirname, '../.env') +try { + const envContent = readFileSync(envPath, 'utf-8') + for (const line of envContent.split('\n')) { + const trimmed = line.trim() + if (!trimmed || trimmed.startsWith('#')) continue + const eqIdx = trimmed.indexOf('=') + if (eqIdx === -1) continue + const key = trimmed.slice(0, eqIdx).trim() + const value = trimmed.slice(eqIdx + 1).trim() + if (!(key in process.env)) process.env[key] = value + } +} catch { + // .env is optional; env vars may already be set +} + +const { + STRIPE_API_KEY, + GOOGLE_CLIENT_ID, + GOOGLE_CLIENT_SECRET, + GOOGLE_REFRESH_TOKEN, + GOOGLE_SPREADSHEET_ID, +} = process.env + +if (!STRIPE_API_KEY) { + console.error('Error: STRIPE_API_KEY is required (set it in .env or the environment)') + process.exit(1) +} + +// Fetch Stripe account ID +const accountRes = await fetch('https://api.stripe.com/v1/account', { + headers: { + Authorization: `Basic ${Buffer.from(`${STRIPE_API_KEY}:`).toString('base64')}`, + }, +}) +const account = (await accountRes.json()) as { id: string } +console.error(`Stripe: ${account.id}`) +console.error(`Sheet: https://docs.google.com/spreadsheets/d/${GOOGLE_SPREADSHEET_ID}`) + +const pipeline = JSON.stringify({ + source: { name: 'stripe', api_key: STRIPE_API_KEY, backfill_limit: 10 }, + destination: { + name: 'google-sheets', + client_id: GOOGLE_CLIENT_ID, + client_secret: GOOGLE_CLIENT_SECRET, + access_token: 'unused', + refresh_token: GOOGLE_REFRESH_TOKEN, + spreadsheet_id: GOOGLE_SPREADSHEET_ID, + }, + streams: [{ name: 'products' }, { name: 'customers' }], +}) + +const repoRoot = resolve(__dirname, '../../..') +const cliPath = resolve(repoRoot, 'apps/engine/src/cli/index.ts') + +// Use bun if available, else tsx +function hasBun(): boolean { + try { + execFileSync('bun', ['--version'], { stdio: 'ignore' }) + return true + } catch { + return false + } +} + +const tsxBin = resolve(repoRoot, 'node_modules/.bin/tsx') +const [cmd, ...cmdArgs] = hasBun() ? ['bun', cliPath] : [tsxBin, cliPath] + +const result = spawnSync(cmd, [...cmdArgs, 'sync', '--xPipeline', pipeline], { + stdio: 'inherit', + cwd: repoRoot, +}) + +process.exit(result.status ?? 1) diff --git a/packages/destination-google-sheets/scripts/sync-via-server.ts b/packages/destination-google-sheets/scripts/sync-via-server.ts new file mode 100644 index 00000000..b648c7b8 --- /dev/null +++ b/packages/destination-google-sheets/scripts/sync-via-server.ts @@ -0,0 +1,135 @@ +#!/usr/bin/env node +// POST /sync — reads from Stripe and writes to Google Sheets, looping until all +// streams are complete. Uses X-State-Checkpoint-Limit: 1 to process one page at +// a time, persisting the cursor to .state.json between pages. +// +// On completion, reads each sheet and prints the row count for each stream. +// +// Usage: npx tsx scripts/sync-via-server.ts [--port 3000] + +import { google } from 'googleapis' +import { + loadEnv, + buildPipeline, + requireEnv, + loadState, + saveState, + getPort, + STREAMS, +} from './_state.js' +import { readSheet } from '../src/writer.js' + +loadEnv() +requireEnv('STRIPE_API_KEY', 'GOOGLE_CLIENT_ID', 'GOOGLE_CLIENT_SECRET', 'GOOGLE_REFRESH_TOKEN') + +const state = loadState() +if (!state) { + console.error('No sheet state found — run setup-via-server.ts first') + process.exit(1) +} + +const serverUrl = `http://localhost:${getPort()}` +console.error(`Sheet: https://docs.google.com/spreadsheets/d/${state.spreadsheet_id}`) + +// Run one page of sync, returns updated syncState +async function runOnePage(syncState: Record): Promise> { + const pipeline = buildPipeline(state!.spreadsheet_id) + const headers: Record = { + 'X-Pipeline': JSON.stringify(pipeline), + 'X-State-Checkpoint-Limit': '1', + } + if (Object.keys(syncState).length > 0) { + headers['X-State'] = JSON.stringify(syncState) + } + + const res = await fetch(`${serverUrl}/sync`, { method: 'POST', headers }) + if (!res.ok && !res.body) { + console.error(`Error: ${res.status} ${res.statusText}`) + process.exit(1) + } + + const updated = { ...syncState } + const reader = res.body!.getReader() + const decoder = new TextDecoder() + let buf = '' + + while (true) { + const { done, value } = await reader.read() + if (done) break + buf += decoder.decode(value, { stream: true }) + const lines = buf.split('\n') + buf = lines.pop() ?? '' + for (const line of lines) { + if (!line.trim()) continue + console.log(line) + try { + const msg = JSON.parse(line) as { type: string; stream?: string; data?: unknown } + if (msg.type === 'state' && msg.stream) updated[msg.stream] = msg.data + } catch { + /* non-JSON line */ + } + } + } + if (buf.trim()) { + console.log(buf) + try { + const msg = JSON.parse(buf) as { type: string; stream?: string; data?: unknown } + if (msg.type === 'state' && msg.stream) updated[msg.stream] = msg.data + } catch {} + } + + return updated +} + +function isAllComplete(syncState: Record): boolean { + return STREAMS.every( + (s) => (syncState[s] as { status?: string } | undefined)?.status === 'complete' + ) +} + +// Loop until all streams are complete +let syncState: Record = { ...(state.sync_state ?? {}) } +let page = 0 + +if (isAllComplete(syncState)) { + console.error('All streams already complete. Reset sync_state to re-sync.') + process.exit(0) +} + +console.error('Starting sync loop...') + +while (!isAllComplete(syncState)) { + page++ + const pending = STREAMS.filter( + (s) => (syncState[s] as { status?: string } | undefined)?.status !== 'complete' + ) + console.error(`[page ${page}] Syncing: ${pending.join(', ')}`) + + syncState = await runOnePage(syncState) + saveState({ spreadsheet_id: state.spreadsheet_id, sync_state: syncState }) +} + +console.error(`\nAll streams complete after ${page} page(s) — clearing sync cursor`) +saveState({ spreadsheet_id: state.spreadsheet_id }) + +// Read each sheet and print row counts +console.error('\nReading sheet row counts...') +const auth = new google.auth.OAuth2( + process.env['GOOGLE_CLIENT_ID'], + process.env['GOOGLE_CLIENT_SECRET'] +) +auth.setCredentials({ refresh_token: process.env['GOOGLE_REFRESH_TOKEN'] }) +const sheets = google.sheets({ version: 'v4', auth }) + +for (const stream of STREAMS) { + try { + const rows = await readSheet(sheets, state.spreadsheet_id, stream) + // Subtract 1 for the header row + const dataRows = Math.max(0, rows.length - 1) + console.error(` ${stream}: ${dataRows} rows`) + } catch (err) { + console.error( + ` ${stream}: error reading sheet — ${err instanceof Error ? err.message : String(err)}` + ) + } +} diff --git a/packages/destination-google-sheets/scripts/teardown-via-server.ts b/packages/destination-google-sheets/scripts/teardown-via-server.ts new file mode 100644 index 00000000..e847399f --- /dev/null +++ b/packages/destination-google-sheets/scripts/teardown-via-server.ts @@ -0,0 +1,35 @@ +#!/usr/bin/env node +// POST /teardown — permanently deletes the Google Sheet and clears local state +// Usage: npx tsx scripts/teardown-via-server.ts [--port 3000] + +import { loadEnv, buildPipeline, requireEnv, loadState, clearState, getPort } from './_state.js' + +loadEnv() +requireEnv('STRIPE_API_KEY', 'GOOGLE_CLIENT_ID', 'GOOGLE_CLIENT_SECRET', 'GOOGLE_REFRESH_TOKEN') + +const state = loadState() +if (!state) { + console.error('No sheet state found — nothing to tear down') + process.exit(1) +} + +const serverUrl = `http://localhost:${getPort()}` +const pipeline = buildPipeline(state.spreadsheet_id) + +console.error(`Hitting ${serverUrl}/teardown ...`) +console.error(`Deleting sheet: https://docs.google.com/spreadsheets/d/${state.spreadsheet_id}`) + +const res = await fetch(`${serverUrl}/teardown`, { + method: 'POST', + headers: { 'X-Pipeline': JSON.stringify(pipeline) }, +}) + +if (res.status === 204) { + clearState() + console.error('Teardown complete') +} else { + const body = await res.text() + console.error(`Error: ${res.status} ${res.statusText}`) + if (body) console.error(body) + process.exit(1) +} diff --git a/packages/destination-google-sheets/src/index.ts b/packages/destination-google-sheets/src/index.ts index 25e49db9..809a1421 100644 --- a/packages/destination-google-sheets/src/index.ts +++ b/packages/destination-google-sheets/src/index.ts @@ -13,9 +13,26 @@ import { google } from 'googleapis' import { z } from 'zod' import { configSchema } from './spec.js' import type { Config } from './spec.js' -import { appendRows, ensureSheet, ensureSpreadsheet } from './writer.js' +import { + appendRows, + createIntroSheet, + deleteSpreadsheet, + ensureSheet, + ensureSpreadsheet, + protectSheets, + updateRows, +} from './writer.js' -export { ensureSpreadsheet, ensureSheet, appendRows, readSheet } from './writer.js' +export { + ensureSpreadsheet, + ensureSheet, + appendRows, + updateRows, + readSheet, + createIntroSheet, + protectSheets, + deleteSpreadsheet, +} from './writer.js' // MARK: - Spec @@ -23,7 +40,7 @@ export { configSchema, envVars, type Config } from './spec.js' // MARK: - Helpers -function makeSheetsClient(config: Config) { +function makeOAuth2Client(config: Config) { const clientId = config.client_id || process.env['GOOGLE_CLIENT_ID'] const clientSecret = config.client_secret || process.env['GOOGLE_CLIENT_SECRET'] if (!clientId) throw new Error('client_id required (provide in config or set GOOGLE_CLIENT_ID)') @@ -34,7 +51,15 @@ function makeSheetsClient(config: Config) { access_token: config.access_token, refresh_token: config.refresh_token, }) - return google.sheets({ version: 'v4', auth }) + return auth +} + +function makeSheetsClient(config: Config) { + return google.sheets({ version: 'v4', auth: makeOAuth2Client(config) }) +} + +function makeDriveClient(config: Config) { + return google.drive({ version: 'v3', auth: makeOAuth2Client(config) }) } /** Stringify a value for a Sheets cell. */ @@ -75,23 +100,52 @@ export function createDestination( return { config: z.toJSONSchema(configSchema) } }, - async check({ config }: { config: Config }): Promise { + async setup({ config, catalog }: { config: Config; catalog: ConfiguredCatalog }) { + if (config.spreadsheet_id) { + spreadsheetId = config.spreadsheet_id + return + } const sheets = sheetsClient ?? makeSheetsClient(config) - try { - await sheets.spreadsheets.get({ - spreadsheetId: config.spreadsheet_id ?? 'test', - }) - return { status: 'succeeded' } - } catch { - return { status: 'succeeded', message: 'Sheets client is configured' } + spreadsheetId = await ensureSpreadsheet(sheets, config.spreadsheet_title) + + // Create the Overview intro tab first (handles "Sheet1" rename if needed) + const streamNames = catalog.streams.map((s) => s.stream.name) + await createIntroSheet(sheets, spreadsheetId, streamNames) + + // Create a data tab for each stream with headers derived from its JSON schema + const sheetIds: number[] = [] + for (const { stream } of catalog.streams) { + const properties = stream.json_schema?.['properties'] as Record | undefined + const headers = properties ? Object.keys(properties) : [] + const sheetId = await ensureSheet(sheets, spreadsheetId, stream.name, headers) + sheetIds.push(sheetId) } + + // Protect all data tabs with a warning so users know edits may be overwritten + await protectSheets(sheets, spreadsheetId, sheetIds) + + return { spreadsheet_id: spreadsheetId } }, - async setup({ config }: { config: Config }) { - if (config.spreadsheet_id) return + async teardown({ config }: { config: Config }) { + const id = config.spreadsheet_id + if (!id) throw new Error('spreadsheet_id is required for teardown') + const drive = makeDriveClient(config) + await deleteSpreadsheet(drive, id) + }, + + async check({ config }: { config: Config }): Promise { const sheets = sheetsClient ?? makeSheetsClient(config) - const id = await ensureSpreadsheet(sheets, config.spreadsheet_title) - return { spreadsheet_id: id } + if (!config.spreadsheet_id) throw new Error('spreadsheet_id is required for check') + try { + await sheets.spreadsheets.get({ spreadsheetId: config.spreadsheet_id }) + return { status: 'succeeded' } + } catch (err) { + return { + status: 'failed', + message: err instanceof Error ? err.message : String(err), + } + } }, async *write( @@ -101,21 +155,36 @@ export function createDestination( const sheets = sheetsClient ?? makeSheetsClient(config) const batchSize = config.batch_size ?? 50 - // Resolve or create spreadsheet - spreadsheetId = - config.spreadsheet_id || (await ensureSpreadsheet(sheets, config.spreadsheet_title)) + if (config.spreadsheet_id) { + spreadsheetId = config.spreadsheet_id + } else { + spreadsheetId = await ensureSpreadsheet(sheets, config.spreadsheet_title) + } - // Per-stream state: column headers and buffered rows + // Per-stream state: column headers, append buffer, and update buffer const streamHeaders = new Map() const streamBuffers = new Map() + const streamUpdates = new Map() - const flushStream = async (streamName: string) => { + const flushAppends = async (streamName: string) => { const buffer = streamBuffers.get(streamName) if (!buffer || buffer.length === 0) return await appendRows(sheets, spreadsheetId!, streamName, buffer) streamBuffers.set(streamName, []) } + const flushUpdates = async (streamName: string) => { + const updates = streamUpdates.get(streamName) + if (!updates || updates.length === 0) return + await updateRows(sheets, spreadsheetId!, streamName, updates) + streamUpdates.set(streamName, []) + } + + const flushStream = async (streamName: string) => { + await flushAppends(streamName) + await flushUpdates(streamName) + } + const flushAll = async () => { for (const streamName of streamBuffers.keys()) { await flushStream(streamName) @@ -125,25 +194,34 @@ export function createDestination( try { for await (const msg of $stdin) { if (msg.type === 'record') { - const { stream, data } = msg + const { stream, data, row_number } = msg // First record for this stream — discover headers, create tab if (!streamHeaders.has(stream)) { const headers = Object.keys(data) streamHeaders.set(stream, headers) streamBuffers.set(stream, []) + streamUpdates.set(stream, []) await ensureSheet(sheets, spreadsheetId!, stream, headers) } - // Map record data to row values in header order const headers = streamHeaders.get(stream)! const row = headers.map((h) => stringify(data[h])) - const buffer = streamBuffers.get(stream)! - buffer.push(row) - // Flush when batch is full - if (buffer.length >= batchSize) { - await flushStream(stream) + if (row_number != null) { + // Targeted update: overwrite the existing row in-place + const updates = streamUpdates.get(stream)! + updates.push({ rowNumber: row_number, values: row }) + if (updates.length >= batchSize) { + await flushUpdates(stream) + } + } else { + // New row: append to the sheet + const buffer = streamBuffers.get(stream)! + buffer.push(row) + if (buffer.length >= batchSize) { + await flushAppends(stream) + } } } else if (msg.type === 'state') { // Flush the stream's pending rows, then re-emit the state checkpoint @@ -178,7 +256,10 @@ export function createDestination( } yield logMsg }, - } satisfies Destination & { spreadsheetId?: string } + // Setup must be called explicitly via /setup before syncing. + // The engine should not auto-run it at the start of sync(). + skipAutoSetup: true as const, + } satisfies Destination & { spreadsheetId?: string; skipAutoSetup: true } return destination } diff --git a/packages/destination-google-sheets/src/writer.ts b/packages/destination-google-sheets/src/writer.ts index 8c1ab252..4a2a7395 100644 --- a/packages/destination-google-sheets/src/writer.ts +++ b/packages/destination-google-sheets/src/writer.ts @@ -1,4 +1,4 @@ -import type { sheets_v4 } from 'googleapis' +import type { drive_v3, sheets_v4 } from 'googleapis' /** * Low-level Sheets API write operations. @@ -48,13 +48,14 @@ export async function ensureSpreadsheet(sheets: sheets_v4.Sheets, title: string) /** * Ensure a tab (sheet) exists for a given stream name with a header row. * If the spreadsheet already has a "Sheet1" default tab, rename it for the first stream. + * Returns the numeric sheetId for use in subsequent API calls (e.g. protect range). */ export async function ensureSheet( sheets: sheets_v4.Sheets, spreadsheetId: string, streamName: string, headers: string[] -): Promise { +): Promise { // Get existing sheets const meta = await withRetry(() => sheets.spreadsheets.get({ @@ -63,12 +64,12 @@ export async function ensureSheet( }) ) const existing = meta.data.sheets ?? [] - const existingNames = existing.map((s) => s.properties?.title) - if (existingNames.includes(streamName)) { - // Tab already exists — write header row in case it's empty + // Tab already exists — write header row and return its ID + const found = existing.find((s) => s.properties?.title === streamName) + if (found) { await writeHeaderRow(sheets, spreadsheetId, streamName, headers) - return + return found.properties!.sheetId! } // If there's a default "Sheet1" and this is the first real stream, rename it @@ -77,6 +78,7 @@ export async function ensureSheet( existing[0]?.properties?.title === 'Sheet1' && existing[0]?.properties?.sheetId !== undefined ) { + const sheetId = existing[0].properties.sheetId! await withRetry(() => sheets.spreadsheets.batchUpdate({ spreadsheetId, @@ -84,10 +86,7 @@ export async function ensureSheet( requests: [ { updateSheetProperties: { - properties: { - sheetId: existing[0]!.properties!.sheetId!, - title: streamName, - }, + properties: { sheetId, title: streamName }, fields: 'title', }, }, @@ -95,19 +94,25 @@ export async function ensureSheet( }, }) ) - } else { - // Add a new tab - await withRetry(() => - sheets.spreadsheets.batchUpdate({ - spreadsheetId, - requestBody: { - requests: [{ addSheet: { properties: { title: streamName } } }], - }, - }) - ) + await writeHeaderRow(sheets, spreadsheetId, streamName, headers) + return sheetId } + // Add a new tab and capture its sheetId from the response + const addRes = await withRetry(() => + sheets.spreadsheets.batchUpdate({ + spreadsheetId, + requestBody: { + requests: [{ addSheet: { properties: { title: streamName } } }], + }, + }) + ) + const sheetId = addRes.data.replies?.[0]?.addSheet?.properties?.sheetId + if (sheetId == null) { + throw new Error(`Failed to get sheetId for new sheet "${streamName}"`) + } await writeHeaderRow(sheets, spreadsheetId, streamName, headers) + return sheetId } async function writeHeaderRow( @@ -116,6 +121,7 @@ async function writeHeaderRow( sheetName: string, headers: string[] ): Promise { + if (headers.length === 0) return await withRetry(() => sheets.spreadsheets.values.update({ spreadsheetId, @@ -126,6 +132,120 @@ async function writeHeaderRow( ) } +/** + * Create or update an "Overview" intro tab at index 0. + * Lists the synced streams and warns users not to edit data tabs. + */ +export async function createIntroSheet( + sheets: sheets_v4.Sheets, + spreadsheetId: string, + streamNames: string[] +): Promise { + const TITLE = 'Overview' + + const meta = await withRetry(() => + sheets.spreadsheets.get({ spreadsheetId, fields: 'sheets.properties' }) + ) + const existing = meta.data.sheets ?? [] + const hasOverview = existing.some((s) => s.properties?.title === TITLE) + + if (!hasOverview) { + // Rename "Sheet1" if it's the only tab, otherwise insert at index 0 + const onlySheet1 = + existing.length === 1 && + existing[0]?.properties?.title === 'Sheet1' && + existing[0]?.properties?.sheetId !== undefined + if (onlySheet1) { + await withRetry(() => + sheets.spreadsheets.batchUpdate({ + spreadsheetId, + requestBody: { + requests: [ + { + updateSheetProperties: { + properties: { sheetId: existing[0]!.properties!.sheetId!, title: TITLE }, + fields: 'title', + }, + }, + ], + }, + }) + ) + } else { + await withRetry(() => + sheets.spreadsheets.batchUpdate({ + spreadsheetId, + requestBody: { + requests: [{ addSheet: { properties: { title: TITLE, index: 0 } } }], + }, + }) + ) + } + } + + const now = new Date().toISOString() + const rows = [ + ['Stripe Sync Engine'], + [''], + ['This spreadsheet is managed by Stripe Sync Engine.'], + ['Data is synced automatically from your Stripe account.'], + [''], + ['Synced streams:'], + ...streamNames.map((name) => [` • ${name}`]), + [''], + [`Last setup: ${now}`], + [''], + ['⚠️ Do not edit data in the synced tabs. Changes will be overwritten on the next sync.'], + ] + + await withRetry(() => + sheets.spreadsheets.values.update({ + spreadsheetId, + range: `'${TITLE}'!A1`, + valueInputOption: 'RAW', + requestBody: { values: rows }, + }) + ) +} + +/** + * Add warning-only protection to a set of sheets by their numeric sheetIds. + * Users will see a warning dialog before editing but are not blocked. + * Idempotent — skips sheets that already have protection. + */ +export async function protectSheets( + sheets: sheets_v4.Sheets, + spreadsheetId: string, + sheetIds: number[] +): Promise { + for (const sheetId of sheetIds) { + try { + await withRetry(() => + sheets.spreadsheets.batchUpdate({ + spreadsheetId, + requestBody: { + requests: [ + { + addProtectedRange: { + protectedRange: { + range: { sheetId }, + description: + 'Managed by Stripe Sync Engine — edits may be overwritten on next sync', + warningOnly: true, + }, + }, + }, + ], + }, + }) + ) + } catch (err) { + if (err instanceof Error && err.message.includes('already has sheet protection')) continue + throw err + } + } +} + /** Append rows to a named sheet tab. Values are stringified for Sheets. */ export async function appendRows( sheets: sheets_v4.Sheets, @@ -146,6 +266,42 @@ export async function appendRows( ) } +/** + * Update specific rows in a sheet by their 1-based row numbers. + * Uses a single batchUpdate call for efficiency. + */ +export async function updateRows( + sheets: sheets_v4.Sheets, + spreadsheetId: string, + sheetName: string, + updates: { rowNumber: number; values: string[] }[] +): Promise { + if (updates.length === 0) return + + const data = updates.map(({ rowNumber, values }) => ({ + range: `'${sheetName}'!A${rowNumber}`, + values: [values], + })) + + await withRetry(() => + sheets.spreadsheets.values.batchUpdate({ + spreadsheetId, + requestBody: { valueInputOption: 'RAW', data }, + }) + ) +} + +/** + * Permanently delete a spreadsheet file via the Drive API. + * The Sheets API does not support deletion — Drive is required. + */ +export async function deleteSpreadsheet( + drive: drive_v3.Drive, + spreadsheetId: string +): Promise { + await withRetry(() => drive.files.delete({ fileId: spreadsheetId })) +} + /** Read all values from a sheet tab. Used for verification in tests. */ export async function readSheet( sheets: sheets_v4.Sheets, diff --git a/packages/destination-google-sheets/tsconfig.scripts.json b/packages/destination-google-sheets/tsconfig.scripts.json new file mode 100644 index 00000000..fdf99a20 --- /dev/null +++ b/packages/destination-google-sheets/tsconfig.scripts.json @@ -0,0 +1,8 @@ +{ + "extends": "../../tsconfig.base.json", + "compilerOptions": { + "noEmit": true, + "types": ["node"] + }, + "include": ["scripts/**/*"] +} diff --git a/packages/protocol/src/protocol.ts b/packages/protocol/src/protocol.ts index e02f1f68..4ce9220f 100644 --- a/packages/protocol/src/protocol.ts +++ b/packages/protocol/src/protocol.ts @@ -111,6 +111,13 @@ export const RecordMessage = z stream: z.string(), data: z.record(z.string(), z.unknown()), emitted_at: z.number(), + /** + * 1-based row number in the destination sheet/table. + * When present, the destination should update the record in-place at this + * position rather than appending a new row. Used by live-event write paths + * (e.g. /write-events) where the orchestrator tracks row positions. + */ + row_number: z.number().int().positive().optional(), }) .meta({ id: 'RecordMessage' }) export type RecordMessage = z.infer @@ -334,4 +341,11 @@ export interface Destination = Record + + /** + * When true, the engine will NOT call setup() automatically at the start of sync(). + * Use this for destinations that manage their own setup lifecycle externally + * (e.g. via a dedicated /setup endpoint call before syncing). + */ + skipAutoSetup?: boolean } diff --git a/packages/source-stripe/src/index.ts b/packages/source-stripe/src/index.ts index 502db60c..e945809e 100644 --- a/packages/source-stripe/src/index.ts +++ b/packages/source-stripe/src/index.ts @@ -340,6 +340,34 @@ export function createStripeSource( export default createStripeSource() +// MARK: - fetchById + +/** + * Fetch a single Stripe object by stream name and ID. + * Uses the resource registry's retrieveFn, which is derived from the OpenAPI spec. + * Returns null if the stream is unknown or has no retrieve endpoint. + */ +export async function fetchById( + config: Config, + stream: string, + id: string +): Promise | null> { + const resolved = await resolveOpenApiSpec( + { apiVersion: config.api_version ?? BUNDLED_API_VERSION }, + apiFetch + ) + const registry = buildResourceRegistry( + resolved.spec, + config.api_key, + resolved.apiVersion, + config.base_url + ) + const resource = registry[stream] + if (!resource?.retrieveFn) return null + const obj = await resource.retrieveFn(id) + return obj as Record +} + // MARK: - Re-exports export { buildResourceRegistry, DEFAULT_SYNC_OBJECTS } from './resourceRegistry.js' diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index bbc76dbd..791242e3 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -461,6 +461,9 @@ importers: specifier: ^4.3.6 version: 4.3.6 devDependencies: + '@types/node': + specifier: ^25.5.0 + version: 25.5.0 vitest: specifier: ^3.2.4 version: 3.2.4(@types/node@25.5.0)(jiti@2.6.1)(lightningcss@1.32.0)(terser@5.46.1)(tsx@4.21.0)(yaml@2.8.1) @@ -7509,7 +7512,6 @@ snapshots: '@types/node@25.5.0': dependencies: undici-types: 7.18.2 - optional: true '@types/pg@8.15.6': dependencies: @@ -9541,8 +9543,7 @@ snapshots: undici-types@7.16.0: {} - undici-types@7.18.2: - optional: true + undici-types@7.18.2: {} undici@7.24.6: {}