Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions apps/engine/src/__generated__/openapi.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

89 changes: 89 additions & 0 deletions apps/engine/src/api/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import {
import { takeStateCheckpoints } from '../lib/pipeline.js'
import { ndjsonResponse } from '@stripe/sync-ts-cli/ndjson'
import { logger } from '../logger.js'
import { fetchById } from '@stripe/sync-source-stripe'
import type { Config as StripeConfig } from '@stripe/sync-source-stripe'
import {
sslConfigFromConnectionString,
stripSslParams,
Expand Down Expand Up @@ -407,6 +409,93 @@ export function createApp(resolver: ConnectorResolver) {
}) as any
)

const ThinEventSchema = z.array(
z.object({
stream: z.string(),
id: z.string(),
row_number: z.number().int().positive().optional(),
})
)

app.openapi(
createRoute({
operationId: 'writeEvents',
method: 'post',
path: '/write-events',
tags: ['Stateless Sync API'],
summary: 'Write thin events to destination',
description:
'Accepts an array of {stream, id, row_number?} objects, fetches full Stripe objects, and writes them to the destination.',
requestParams: { header: pipelineHeaders },
requestBody: {
required: true,
content: { 'application/json': { schema: ThinEventSchema } },
},
responses: {
200: {
description: 'NDJSON stream of write result messages',
content: { 'application/x-ndjson': { schema: DestinationOutputSchema } },
},
400: errorResponse,
},
}),
// eslint-disable-next-line @typescript-eslint/no-explicit-any
(async (c: any) => {
const params = parseSyncParams(c)
const context = { path: '/write-events', ...syncRequestContext(params) }
const startedAt = Date.now()

let events: z.infer<typeof ThinEventSchema>
try {
events = ThinEventSchema.parse(await c.req.json())
} catch {
return c.json({ error: 'Body must be an array of {stream, id, row_number?}' }, 400)
}
if (events.length === 0) {
return c.json({ error: 'Events array must not be empty' }, 400)
}

logger.info({ ...context, eventCount: events.length }, 'Engine API /write-events started')

const sourceConfig = params.pipeline.source as unknown as StripeConfig
const now = Date.now()
const fetched = await Promise.all(
events.map(async (event) => {
const data = await fetchById(sourceConfig, event.stream, event.id)
return { event, data }
})
)

for (const { event, data } of fetched) {
if (!data) {
logger.warn(
{ stream: event.stream, id: event.id },
'Engine API /write-events: fetchById returned null — skipping'
)
}
}

const records = fetched.filter(({ data }) => data != null)

async function* recordMessages(): AsyncIterable<import('@stripe/sync-protocol').DestinationInput> {
for (const { event, data } of records) {
yield {
type: 'record' as const,
stream: event.stream,
data: data!,
emitted_at: now,
row_number: event.row_number,
}
}
}

const engine = await createEngineFromParams(params.pipeline, resolver, readonlyStateStore())
return ndjsonResponse(
logApiStream('Engine API /write-events', engine.write(recordMessages()), context, startedAt)
)
}) as any
)

app.openapi(
createRoute({
operationId: 'sync',
Expand Down
28 changes: 27 additions & 1 deletion apps/engine/src/lib/engine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,25 @@ export function createEngine(
return _catalog
}

async function setupSource(catalog: ConfiguredCatalog): Promise<void> {
if (connectors.source.setup) {
await withLoggedStep('Engine source setup', baseContext, () =>
connectors.source.setup!({ config: sourceConfig, catalog })
)
}
}

async function setupDestination(
catalog: ConfiguredCatalog
): Promise<Record<string, unknown> | undefined> {
if (connectors.destination.setup) {
const result = await withLoggedStep('Engine destination setup', baseContext, () =>
connectors.destination.setup!({ config: destConfig, catalog })
)
return result ?? undefined
}
}

return {
async setup() {
const catalog = await getCatalog()
Expand Down Expand Up @@ -252,7 +271,14 @@ export function createEngine(
},

async *sync(input?: AsyncIterable<unknown>) {
await this.setup()
if (connectors.destination.skipAutoSetup) {
// Destination manages its own setup externally (e.g. via /setup endpoint).
// Only set up the source here.
const catalog = await getCatalog()
await setupSource(catalog)
} else {
await this.setup()
}
yield* pipe(this.read(input), this.write, persistState(stateStore))
},
}
Expand Down
1 change: 1 addition & 0 deletions packages/dashboard/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
.next/
1 change: 1 addition & 0 deletions packages/destination-google-sheets/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
scripts/.state.json
15 changes: 10 additions & 5 deletions packages/destination-google-sheets/__tests__/memory-sheets.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,17 +76,19 @@ export function createMemorySheets() {
const ss = getSpreadsheet(params.spreadsheetId)
const requests = (params.requestBody?.requests ?? []) as Record<string, unknown>[]

const replies: unknown[] = []

for (const req of requests) {
if (req.addSheet) {
const props = (req.addSheet as { properties?: { title?: string } }).properties
const name = props?.title ?? `Sheet${ss.sheets.size + 1}`
if (ss.sheets.has(name)) {
throw Object.assign(new Error(`Sheet already exists: ${name}`), { code: 400 })
}
ss.sheets.set(name, { sheetId: nextSheetId++, values: [] })
}

if (req.updateSheetProperties) {
const sheetId = nextSheetId++
ss.sheets.set(name, { sheetId, values: [] })
replies.push({ addSheet: { properties: { sheetId, title: name } } })
} else if (req.updateSheetProperties) {
const update = req.updateSheetProperties as {
properties: { sheetId: number; title: string }
fields: string
Expand All @@ -99,10 +101,13 @@ export function createMemorySheets() {
break
}
}
replies.push({})
} else {
replies.push({})
}
}

return { data: {} }
return { data: { replies } }
},

values: {
Expand Down
1 change: 1 addition & 0 deletions packages/destination-google-sheets/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
"zod": "^4.3.6"
},
"devDependencies": {
"@types/node": "^25.5.0",
"vitest": "^3.2.4"
}
}
95 changes: 95 additions & 0 deletions packages/destination-google-sheets/scripts/_state.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Shared helpers for the destination-google-sheets scripts.
// Loads .env and manages a local .state.json that acts as a fake DB for the sheet ID.

import { readFileSync, writeFileSync, unlinkSync } from 'node:fs'
import { resolve, dirname } from 'node:path'
import { fileURLToPath } from 'node:url'

const __dirname = dirname(fileURLToPath(import.meta.url))
const STATE_FILE = resolve(__dirname, '.state.json')

// ── Env loading ──────────────────────────────────────────────────────────────

export function loadEnv(): void {
const envPath = resolve(__dirname, '../.env')
try {
const content = readFileSync(envPath, 'utf-8')
for (const line of content.split('\n')) {
const trimmed = line.trim()
if (!trimmed || trimmed.startsWith('#')) continue
const eqIdx = trimmed.indexOf('=')
if (eqIdx === -1) continue
const key = trimmed.slice(0, eqIdx).trim()
const value = trimmed.slice(eqIdx + 1).trim()
if (!(key in process.env)) process.env[key] = value
}
} catch {
// .env is optional
}
}

// ── Sheet state ───────────────────────────────────────────────────────────────

export interface SheetState {
spreadsheet_id: string
/** Per-stream cursor state, persisted across sync calls for resumable pagination. */
sync_state?: Record<string, unknown>
}

export function loadState(): SheetState | null {
try {
return JSON.parse(readFileSync(STATE_FILE, 'utf-8')) as SheetState
} catch {
return null
}
}

export function saveState(state: SheetState): void {
writeFileSync(STATE_FILE, JSON.stringify(state, null, 2) + '\n')
console.error(`Saved state → ${STATE_FILE}`)
}

export function clearState(): void {
try {
unlinkSync(STATE_FILE)
console.error(`Cleared state (${STATE_FILE})`)
} catch {
// already gone
}
}

// ── Pipeline builder ──────────────────────────────────────────────────────────

export function buildDestinationConfig(spreadsheetId?: string): Record<string, unknown> {
return {
name: 'google-sheets',
client_id: process.env['GOOGLE_CLIENT_ID'],
client_secret: process.env['GOOGLE_CLIENT_SECRET'],
access_token: 'unused',
refresh_token: process.env['GOOGLE_REFRESH_TOKEN'],
...(spreadsheetId ? { spreadsheet_id: spreadsheetId } : {}),
}
}

export const STREAMS = ['products', 'customers', 'prices', 'subscriptions'] as const

export function buildPipeline(spreadsheetId?: string): Record<string, unknown> {
return {
source: { name: 'stripe', api_key: process.env['STRIPE_API_KEY'], backfill_limit: 10 },
destination: buildDestinationConfig(spreadsheetId),
streams: STREAMS.map((name) => ({ name })),
}
}

export function requireEnv(...keys: string[]): void {
const missing = keys.filter((k) => !process.env[k])
if (missing.length > 0) {
console.error(`Error: missing required env vars: ${missing.join(', ')}`)
process.exit(1)
}
}

export function getPort(): string {
const idx = process.argv.indexOf('--port')
return idx !== -1 ? process.argv[idx + 1] : '3000'
}
29 changes: 29 additions & 0 deletions packages/destination-google-sheets/scripts/check-via-server.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#!/usr/bin/env node
// GET /check — validates credentials and sheet accessibility
// Usage: npx tsx scripts/check-via-server.ts [--port 3000]

import { loadEnv, buildPipeline, requireEnv, loadState, getPort } from './_state.js'

loadEnv()
requireEnv('STRIPE_API_KEY', 'GOOGLE_CLIENT_ID', 'GOOGLE_CLIENT_SECRET', 'GOOGLE_REFRESH_TOKEN')

const state = loadState()
if (!state) {
console.error('No sheet state found — run setup-via-server.ts first')
process.exit(1)
}

const serverUrl = `http://localhost:${getPort()}`
const pipeline = buildPipeline(state.spreadsheet_id)

console.error(`Hitting ${serverUrl}/check ...`)
console.error(`Sheet: https://docs.google.com/spreadsheets/d/${state.spreadsheet_id}`)

const res = await fetch(`${serverUrl}/check`, {
headers: { 'X-Pipeline': JSON.stringify(pipeline) },
})

const result = await res.json()
console.log(JSON.stringify(result, null, 2))

if (res.status !== 200) process.exit(1)
31 changes: 31 additions & 0 deletions packages/destination-google-sheets/scripts/setup-via-server.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
#!/usr/bin/env node
// POST /setup — creates a new Google Sheet, saves its ID to .state.json
// Usage: npx tsx scripts/setup-via-server.ts [--port 3000]

import { loadEnv, buildPipeline, requireEnv, saveState, getPort } from './_state.js'

loadEnv()
requireEnv('STRIPE_API_KEY', 'GOOGLE_CLIENT_ID', 'GOOGLE_CLIENT_SECRET', 'GOOGLE_REFRESH_TOKEN')

const serverUrl = `http://localhost:${getPort()}`

// No spreadsheet_id — setup always creates a new sheet
const pipeline = buildPipeline()

console.error(`Hitting ${serverUrl}/setup ...`)

const res = await fetch(`${serverUrl}/setup`, {
method: 'POST',
headers: { 'X-Pipeline': JSON.stringify(pipeline) },
})

if (res.status === 200) {
const result = (await res.json()) as { spreadsheet_id: string }
saveState({ spreadsheet_id: result.spreadsheet_id })
console.log(JSON.stringify(result, null, 2))
} else {
const body = await res.text()
console.error(`Error: ${res.status} ${res.statusText}`)
if (body) console.error(body)
process.exit(1)
}
Loading
Loading