feat: BigQuery Data Warehouse Connector — Phase 1 (Schema, Types, Validation)#391
feat: BigQuery Data Warehouse Connector — Phase 1 (Schema, Types, Validation)#391Shrotriya-lalit wants to merge 8 commits into
Conversation
Phase 1 of the BigQuery warehouse connector. Adds Prisma models for BigQueryConnection, BigQuerySync and BigQuerySyncRun with four supporting enums, two Prisma migrations, typed JSON column via PrismaJson namespace, Zod schemas (zBigQuerySyncConfig + column mapping variants) in @openpanel/validation, and the @google-cloud/bigquery dependency.
- Add zBqColRef validator for column references (supports dot-notation for STRUCT nested fields like user.profile.email) - Add mappingType discriminator to both mapping schemas so the union is discriminated and TypeScript can narrow the type cleanly - Add superRefine cross-validation: append mode events syncs must declare an insertTime column (the TIMESTAMP cursor) - Update plan with verified BigQuery Node.js client type mappings: INT64 needs wrapIntegers:true, TIMESTAMP/.value not .toISOString(), DATETIME has no timezone, BYTES→Buffer, Big for NUMERIC/BIGNUMERIC
Real-world orgs connect multiple data sources to one project (e.g. jm-ebg and jm-ebg-cdp on the same ROAS project). The original @unique on projectId wrongly enforced a single connection per project. - Remove @unique from BigQueryConnection.projectId - Add name String field (user label, e.g. "CDP Source") - Replace with @@unique([projectId, name]) — names unique within project - Change Project.bigQueryConnection? → bigQueryConnections[]
Schema additions: - BigQueryConnection: gcpRegion (GDPR region compliance), lastTestedAt/lastTestStatus (connection health) - BigQuerySync: lastSyncStatus typed as enum (was String?), errorRetryCount+isErrorPaused circuit breaker, partitionFilter for cost-safe full-refresh on partitioned tables - BigQuerySyncRun: rowCount BigInt (INT max ~2.1B insufficient), bytesProcessed for cost tracking Zod additions: - zGcpProjectId: GCP project ID format regex (rejects project numbers and display names) - zBqIdentifier: dataset/table name validator (no hyphens, per BQ naming rules) - zServiceAccountJson: SA JSON structure check (rejects authorized_user creds before encryption) - zBigQueryConnectionCreate: connection creation schema with name/region/SA JSON - zBigQuerySyncConfig: dataset/tableName now use zBqIdentifier, partitionFilter field added
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughAdds BigQuery connector migrations, then generalizes them into a provider-agnostic Warehouse schema; updates Prisma models/types; adds Zod validation for warehouse/BigQuery configs and mappings; and adds the BigQuery runtime dependency. ChangesWarehouse & BigQuery connector
sequenceDiagram
participant Client
participant API
participant Database
participant WarehouseProvider
Client->>API: request create/trigger sync (projectId, connectionName)
API->>Database: read WarehouseConnection / WarehouseSync
API->>WarehouseProvider: authenticate using serviceAccountJson / run extract
WarehouseProvider->>API: return rows and bytesProcessed
API->>Database: insert WarehouseSyncRun (status, rowCount, bytesProcessed)
🎯 4 (Complex) | ⏱️ ~45 minutes
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@packages/db/package.json`:
- Line 17: Update the `@google-cloud/bigquery` dependency in
packages/db/package.json from ^7.9.1 to ^8.3.1 (look for the dependency key
"`@google-cloud/bigquery`") and run your test suite and any integration checks
that exercise BigQuery calls to catch breaking API changes; after updating,
re-run npm audit / GitHub advisory checks and address any newly surfaced
advisories or required code adjustments in functions that call BigQuery client
methods (e.g., places creating BigQuery clients or invoking methods like
dataset/table/query) to align with the v8 API.
In
`@packages/db/prisma/migrations/20260608090217_add_bigquery_connector/migration.sql`:
- Around line 50-53: The migration creates bigquery_sync_runs with a projectId
column but no FK, allowing runs to be assigned to the wrong project; add
referential integrity by adding a foreign key on bigquery_sync_runs.projectId
referencing the canonical projects table (projects.id) and also ensure
run-to-sync consistency by adding a composite foreign key (syncId, projectId)
referencing bigquery_syncs(id, projectId) (first add a unique constraint on
bigquery_syncs(id, projectId) if needed); update the CREATE TABLE for
bigquery_sync_runs to include these constraints (names like
fk_bigquery_sync_runs_project and fk_bigquery_sync_runs_sync_project) so every
run is tied to an existing project and the syncId matches that project.
- Around line 69-73: The two separate FKs allow mismatched project vs connection
tenants on bigquery_syncs; drop the existing bigquery_syncs_connectionId_fkey
and replace it with a composite foreign key (e.g.
bigquery_syncs_projectId_connectionId_fkey) on (projectId, connectionId) that
REFERENCES "public"."bigquery_connections"(projectId, id) WITH ON DELETE CASCADE
ON UPDATE CASCADE so the referenced connection must belong to the same project;
keep or leave the existing projectId -> projects FK
(bigquery_syncs_projectId_fkey) as-is.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 90092f45-30c5-4c2c-aaa2-84bd90a711d7
⛔ Files ignored due to path filters (1)
pnpm-lock.yamlis excluded by!**/pnpm-lock.yaml
📒 Files selected for processing (8)
packages/db/package.jsonpackages/db/prisma/migrations/20260607120000_add_event_meta_description/migration.sqlpackages/db/prisma/migrations/20260608090217_add_bigquery_connector/migration.sqlpackages/db/prisma/migrations/20260608091000_bigquery_multi_connection/migration.sqlpackages/db/prisma/migrations/20260608120000_bigquery_harden/migration.sqlpackages/db/prisma/schema.prismapackages/db/src/types.tspackages/validation/src/index.ts
| CREATE TABLE "public"."bigquery_sync_runs" ( | ||
| "id" UUID NOT NULL DEFAULT gen_random_uuid(), | ||
| "syncId" UUID NOT NULL, | ||
| "projectId" TEXT NOT NULL, |
There was a problem hiding this comment.
Constrain bigquery_sync_runs.projectId to prevent mismatched run attribution.
Line 53 stores projectId, but there is no FK for it, and Line 76 only validates syncId. A run row can carry a wrong project id, breaking run-history correctness and tenant-scoped queries.
Suggested migration direction
+-- Keep sync/project coupling enforceable
+ALTER TABLE "public"."bigquery_syncs"
+ ADD CONSTRAINT "bigquery_syncs_id_projectId_key" UNIQUE ("id", "projectId");
+
+-- Enforce run project consistency with parent sync
+ALTER TABLE "public"."bigquery_sync_runs"
+ ADD CONSTRAINT "bigquery_sync_runs_syncId_projectId_fkey"
+ FOREIGN KEY ("syncId", "projectId")
+ REFERENCES "public"."bigquery_syncs"("id", "projectId")
+ ON DELETE CASCADE ON UPDATE CASCADE;
+
+-- Optional explicit FK for direct project integrity
+ALTER TABLE "public"."bigquery_sync_runs"
+ ADD CONSTRAINT "bigquery_sync_runs_projectId_fkey"
+ FOREIGN KEY ("projectId")
+ REFERENCES "public"."projects"("id")
+ ON DELETE CASCADE ON UPDATE CASCADE;Also applies to: 75-76
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In
`@packages/db/prisma/migrations/20260608090217_add_bigquery_connector/migration.sql`
around lines 50 - 53, The migration creates bigquery_sync_runs with a projectId
column but no FK, allowing runs to be assigned to the wrong project; add
referential integrity by adding a foreign key on bigquery_sync_runs.projectId
referencing the canonical projects table (projects.id) and also ensure
run-to-sync consistency by adding a composite foreign key (syncId, projectId)
referencing bigquery_syncs(id, projectId) (first add a unique constraint on
bigquery_syncs(id, projectId) if needed); update the CREATE TABLE for
bigquery_sync_runs to include these constraints (names like
fk_bigquery_sync_runs_project and fk_bigquery_sync_runs_sync_project) so every
run is tied to an existing project and the syncId matches that project.
| -- AddForeignKey | ||
| ALTER TABLE "public"."bigquery_syncs" ADD CONSTRAINT "bigquery_syncs_connectionId_fkey" FOREIGN KEY ("connectionId") REFERENCES "public"."bigquery_connections"("id") ON DELETE CASCADE ON UPDATE CASCADE; | ||
|
|
||
| -- AddForeignKey | ||
| ALTER TABLE "public"."bigquery_syncs" ADD CONSTRAINT "bigquery_syncs_projectId_fkey" FOREIGN KEY ("projectId") REFERENCES "public"."projects"("id") ON DELETE CASCADE ON UPDATE CASCADE; |
There was a problem hiding this comment.
Enforce project/connection tenant consistency on bigquery_syncs.
Line 70 and Line 73 create independent foreign keys, so a sync can reference a connectionId from project B while storing projectId from project A. That breaks tenant isolation and can misroute data.
Suggested migration direction
+-- Ensure referenced columns are jointly unique
+ALTER TABLE "public"."bigquery_connections"
+ ADD CONSTRAINT "bigquery_connections_id_projectId_key" UNIQUE ("id", "projectId");
+
+-- Enforce sync belongs to the same project as its connection
+ALTER TABLE "public"."bigquery_syncs"
+ ADD CONSTRAINT "bigquery_syncs_connectionId_projectId_fkey"
+ FOREIGN KEY ("connectionId", "projectId")
+ REFERENCES "public"."bigquery_connections"("id", "projectId")
+ ON DELETE CASCADE ON UPDATE CASCADE;🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In
`@packages/db/prisma/migrations/20260608090217_add_bigquery_connector/migration.sql`
around lines 69 - 73, The two separate FKs allow mismatched project vs
connection tenants on bigquery_syncs; drop the existing
bigquery_syncs_connectionId_fkey and replace it with a composite foreign key
(e.g. bigquery_syncs_projectId_connectionId_fkey) on (projectId, connectionId)
that REFERENCES "public"."bigquery_connections"(projectId, id) WITH ON DELETE
CASCADE ON UPDATE CASCADE so the referenced connection must belong to the same
project; keep or leave the existing projectId -> projects FK
(bigquery_syncs_projectId_fkey) as-is.
154e5e2 to
7920219
Compare
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In
`@packages/db/prisma/migrations/20260608091000_bigquery_multi_connection/migration.sql`:
- Around line 6-9: The migration creates bigquery_connections.name with an
empty-string backfill which leaves existing rows with '' and still allows future
empty-string inserts; update the migration to backfill a meaningful non-empty
value for existing rows (e.g., update "bigquery_connections" set "name" =
concat('connection_', id) or another deterministic non-empty token for rows
where name = '') and then add a DB-level constraint to prevent empty names
(e.g., ALTER TABLE "bigquery_connections" ADD CONSTRAINT ... CHECK
(char_length(name) > 0) or name <> '') so future inserts cannot use ''. Ensure
the ALTER that drops the default comes after the backfill and the CHECK
constraint is added as part of the migration so both existing and new rows are
validated.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: ce3b42d9-fd05-4483-8001-e2ff1812b45c
⛔ Files ignored due to path filters (1)
pnpm-lock.yamlis excluded by!**/pnpm-lock.yaml
📒 Files selected for processing (8)
packages/db/package.jsonpackages/db/prisma/migrations/20260607120000_add_event_meta_description/migration.sqlpackages/db/prisma/migrations/20260608090217_add_bigquery_connector/migration.sqlpackages/db/prisma/migrations/20260608091000_bigquery_multi_connection/migration.sqlpackages/db/prisma/migrations/20260608120000_bigquery_harden/migration.sqlpackages/db/prisma/schema.prismapackages/db/src/types.tspackages/validation/src/index.ts
✅ Files skipped from review due to trivial changes (1)
- packages/db/src/types.ts
🚧 Files skipped from review as they are similar to previous changes (6)
- packages/db/package.json
- packages/db/prisma/migrations/20260607120000_add_event_meta_description/migration.sql
- packages/db/prisma/schema.prisma
- packages/validation/src/index.ts
- packages/db/prisma/migrations/20260608090217_add_bigquery_connector/migration.sql
- packages/db/prisma/migrations/20260608120000_bigquery_harden/migration.sql
| ALTER TABLE "public"."bigquery_connections" ADD COLUMN "name" TEXT NOT NULL DEFAULT ''; | ||
|
|
||
| -- Remove the default now that column exists | ||
| ALTER TABLE "public"."bigquery_connections" ALTER COLUMN "name" DROP DEFAULT; |
There was a problem hiding this comment.
Backfill leaves invalid empty names and the DB still permits future empty-string inserts.
Line 6 seeds existing rows with '', and after Line 9 those rows remain empty. That conflicts with the non-empty connection-name contract in validation and can leak invalid records into future flows. Please backfill to a non-empty value and enforce non-empty at the database layer.
Suggested migration adjustment
-ALTER TABLE "public"."bigquery_connections" ADD COLUMN "name" TEXT NOT NULL DEFAULT '';
-
--- Remove the default now that column exists
-ALTER TABLE "public"."bigquery_connections" ALTER COLUMN "name" DROP DEFAULT;
+ALTER TABLE "public"."bigquery_connections" ADD COLUMN "name" TEXT;
+UPDATE "public"."bigquery_connections"
+SET "name" = 'default'
+WHERE "name" IS NULL OR btrim("name") = '';
+ALTER TABLE "public"."bigquery_connections" ALTER COLUMN "name" SET NOT NULL;
+ALTER TABLE "public"."bigquery_connections"
+ADD CONSTRAINT "bigquery_connections_name_nonempty_chk"
+CHECK (btrim("name") <> '');📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| ALTER TABLE "public"."bigquery_connections" ADD COLUMN "name" TEXT NOT NULL DEFAULT ''; | |
| -- Remove the default now that column exists | |
| ALTER TABLE "public"."bigquery_connections" ALTER COLUMN "name" DROP DEFAULT; | |
| ALTER TABLE "public"."bigquery_connections" ADD COLUMN "name" TEXT; | |
| UPDATE "public"."bigquery_connections" | |
| SET "name" = 'default' | |
| WHERE "name" IS NULL OR btrim("name") = ''; | |
| ALTER TABLE "public"."bigquery_connections" ALTER COLUMN "name" SET NOT NULL; | |
| ALTER TABLE "public"."bigquery_connections" | |
| ADD CONSTRAINT "bigquery_connections_name_nonempty_chk" | |
| CHECK (btrim("name") <> ''); |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In
`@packages/db/prisma/migrations/20260608091000_bigquery_multi_connection/migration.sql`
around lines 6 - 9, The migration creates bigquery_connections.name with an
empty-string backfill which leaves existing rows with '' and still allows future
empty-string inserts; update the migration to backfill a meaningful non-empty
value for existing rows (e.g., update "bigquery_connections" set "name" =
concat('connection_', id) or another deterministic non-empty token for rows
where name = '') and then add a DB-level constraint to prevent empty names
(e.g., ALTER TABLE "bigquery_connections" ADD CONSTRAINT ... CHECK
(char_length(name) > 0) or name <> '') so future inserts cannot use ''. Ensure
the ALTER that drops the default comes after the backfill and the CHECK
constraint is added as part of the migration so both existing and new rows are
validated.
…d dependency upgrade
- Upgrade @google-cloud/bigquery from ^7.9.1 to ^8.3.1 (latest stable)
- Add FK bigquery_sync_runs.projectId -> projects(id) (was missing, allowing orphan runs)
- Add composite FK bigquery_syncs(projectId, connectionId) -> bigquery_connections(projectId, id)
to prevent cross-tenant data: a sync can no longer reference a connection from a different project
- Add UNIQUE INDEX bigquery_connections(projectId, id) to back the composite FK
- Add CHECK(char_length(name) > 0) on bigquery_connections to enforce non-empty names at DB level
- Backfill any dev rows with empty name using concat('connection_', id)
…Zod validators Adds the foundational schema for a multi-provider Data Warehouse Connector (BigQuery first, extensible to Snowflake/Redshift/Databricks/Postgres). Three shared Prisma tables: - warehouse_connections: one row per named connection, any provider - warehouse_syncs: one sync job per connection - warehouse_sync_runs: one run record per execution Key design decisions: - configEncrypted (AES-256-GCM) + displayIdentifier/displayEmail for UI display without decryption - Composite FK warehouse_syncs(projectId,connectionId) → warehouse_connections(projectId,id) blocks cross-tenant exploit at DB level - Three sync modes: append (cursor), full (reload + stale cleanup), onetime (backfill) - cursorOverlapMinutes (default 10) rewinds append cursor to catch late-arriving rows - syncDelayMinutes (default 0) delays cron execution to allow BQ pipelines to land - Performance indexes on all FK columns (PostgreSQL does not create these automatically) - jsonProperties column mapping flattens a JSON column into event/profile properties Validation (packages/validation/src/index.ts): - zBigQueryWarehouseConfig + zWarehouseConfig discriminated union - zBigQuerySyncConfig with superRefine cross-field rules: schedule required for append/full, insertTime required for append, eventName/eventNameStatic mutually exclusive Migrations applied: 20260610115042–20260610115046 21/21 Zod validator probes pass. Zero schema drift (prisma migrate diff).
There was a problem hiding this comment.
🧹 Nitpick comments (2)
packages/db/prisma/schema.prisma (1)
794-810: ⚖️ Poor tradeoffConsider adding composite FK for tenant isolation consistency.
WarehouseSyncuses a composite FK(projectId, connectionId) → WarehouseConnection(projectId, id)to prevent cross-tenant references.WarehouseSyncRunhas separate single-column FKs forsyncIdandprojectId, so the database does not enforce thatprojectIdmatches the parent sync'sprojectId.Since runs are created by backend workers (not user input), the risk is limited to application bugs, not direct exploitation. However, adding a composite FK would maintain consistency with the sync→connection pattern and prevent any future data integrity drift.
This would require:
- A unique index on
WarehouseSync(projectId, id)(similar to what exists onWarehouseConnection)- A composite FK on
WarehouseSyncRun(projectId, syncId) → WarehouseSync(projectId, id)🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@packages/db/prisma/schema.prisma` around lines 794 - 810, Add a composite foreign-key relation to enforce tenant isolation by making WarehouseSync expose a unique (or indexed+unique) key on (projectId, id) and updating WarehouseSyncRun to reference WarehouseSync via the pair (projectId, syncId); specifically, add a @@unique or @@index+@@unique for WarehouseSync on (projectId, id) (if not already present) and change the relation in WarehouseSyncRun to use relation(fields: [projectId, syncId], references: [projectId, id]) so the run row cannot reference a sync from a different project (update the relation name/attributes on the WarehouseSyncRun model accordingly and remove the single-column-only FK behavior).packages/db/src/types.ts (1)
31-31: ⚡ Quick winNaming inconsistency:
IWarehouseColumnMappinglacksIPrismaprefix.All other types in the
PrismaJsonnamespace follow the patternIPrisma<TypeName>(e.g.,IPrismaImportConfig,IPrismaNotificationRuleConfig), but this new type is namedIWarehouseColumnMappingwithout the prefix.While this works (because it matches the Prisma schema annotation
/// [IWarehouseColumnMapping]), the inconsistency may confuse maintainers. Consider renaming both the schema annotation and this type toIPrismaWarehouseColumnMappingto maintain consistency with the established pattern.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@packages/db/src/types.ts` at line 31, Rename the inconsistent type and its Prisma schema annotation to use the established IPrisma prefix: change the alias type IWarehouseColumnMapping (currently pointing to IWarehouseColumnMappingType) to IPrismaWarehouseColumnMapping and update the underlying type name if needed (IWarehouseColumnMappingType → IPrismaWarehouseColumnMappingType or keep original type but rename the alias). Also update the Prisma schema annotation /// [IWarehouseColumnMapping] to /// [IPrismaWarehouseColumnMapping] and search/replace any references to IWarehouseColumnMapping (in the PrismaJson namespace and across the codebase) so all usages reference IPrismaWarehouseColumnMapping to maintain naming consistency.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Nitpick comments:
In `@packages/db/prisma/schema.prisma`:
- Around line 794-810: Add a composite foreign-key relation to enforce tenant
isolation by making WarehouseSync expose a unique (or indexed+unique) key on
(projectId, id) and updating WarehouseSyncRun to reference WarehouseSync via the
pair (projectId, syncId); specifically, add a @@unique or @@index+@@unique for
WarehouseSync on (projectId, id) (if not already present) and change the
relation in WarehouseSyncRun to use relation(fields: [projectId, syncId],
references: [projectId, id]) so the run row cannot reference a sync from a
different project (update the relation name/attributes on the WarehouseSyncRun
model accordingly and remove the single-column-only FK behavior).
In `@packages/db/src/types.ts`:
- Line 31: Rename the inconsistent type and its Prisma schema annotation to use
the established IPrisma prefix: change the alias type IWarehouseColumnMapping
(currently pointing to IWarehouseColumnMappingType) to
IPrismaWarehouseColumnMapping and update the underlying type name if needed
(IWarehouseColumnMappingType → IPrismaWarehouseColumnMappingType or keep
original type but rename the alias). Also update the Prisma schema annotation
/// [IWarehouseColumnMapping] to /// [IPrismaWarehouseColumnMapping] and
search/replace any references to IWarehouseColumnMapping (in the PrismaJson
namespace and across the codebase) so all usages reference
IPrismaWarehouseColumnMapping to maintain naming consistency.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: cdddf040-421e-4c69-afaf-2bcb03567596
⛔ Files ignored due to path filters (1)
pnpm-lock.yamlis excluded by!**/pnpm-lock.yaml
📒 Files selected for processing (8)
packages/db/prisma/migrations/20260610115042_warehouse_restructure/migration.sqlpackages/db/prisma/migrations/20260610115043_warehouse_security_fks/migration.sqlpackages/db/prisma/migrations/20260610115044_warehouse_phase1_finalize/migration.sqlpackages/db/prisma/migrations/20260610115045_warehouse_onetime_mode/migration.sqlpackages/db/prisma/migrations/20260610115046_warehouse_sync_overlap_delay/migration.sqlpackages/db/prisma/schema.prismapackages/db/src/types.tspackages/validation/src/index.ts
✅ Files skipped from review due to trivial changes (2)
- packages/db/prisma/migrations/20260610115046_warehouse_sync_overlap_delay/migration.sql
- packages/db/prisma/migrations/20260610115045_warehouse_onetime_mode/migration.sql
…v#391 - Rename PrismaJson type alias IWarehouseColumnMapping → IPrismaWarehouseColumnMapping to match the established IPrisma* prefix convention used by all other types in the namespace - Update schema.prisma annotation to match (/// [IPrismaWarehouseColumnMapping]) - Add @@unique([id, projectId]) on WarehouseSync to back a composite FK - Replace single-column syncId FK on WarehouseSyncRun with composite (syncId, projectId) → warehouse_syncs(id, projectId) to enforce that a run's projectId always matches its parent sync's projectId — closes the same cross-tenant gap that the existing composite FK on WarehouseSync already closes one level up
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In
`@packages/db/prisma/migrations/20260611000000_warehouse_run_composite_fk/migration.sql`:
- Around line 14-18: Before adding the composite FK ("syncId","projectId") →
warehouse_syncs("id","projectId") in the migration, run a pre-check that counts
mismatched rows between warehouse_sync_runs and warehouse_syncs (use the
provided SELECT with IS DISTINCT FROM) and, if >0, perform a normalization
UPDATE to set warehouse_sync_runs.projectId = warehouse_syncs.projectId for rows
where they differ; place this normalization step after the DROP CONSTRAINT
"warehouse_sync_runs_syncId_fkey" and before ADD CONSTRAINT
"warehouse_sync_runs_syncId_projectId_fkey" so the new FK add will not fail
(keep the unique index creation as-is).
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 3de3952b-f359-4897-8bff-2f7f2775e8af
📒 Files selected for processing (3)
packages/db/prisma/migrations/20260611000000_warehouse_run_composite_fk/migration.sqlpackages/db/prisma/schema.prismapackages/db/src/types.ts
🚧 Files skipped from review as they are similar to previous changes (2)
- packages/db/src/types.ts
- packages/db/prisma/schema.prisma
Before adding the (syncId, projectId) composite FK, normalize any runs whose projectId doesn't match their parent sync's projectId. Without this the ADD CONSTRAINT fails if mismatched rows exist. Tables are dev-only now so this is a no-op, but makes the migration safe to apply to any environment.
BigQuery Data Warehouse Connector — Phase 1: Schema, Types, Validation
Adds the foundational schema for a multi-provider Data Warehouse Connector — BigQuery first, with Snowflake, Redshift, Databricks, and Postgres ready to be added without any new tables.
What's in this PR
Three shared Prisma tables
warehouse_connectionswarehouse_syncswarehouse_sync_runsEnums
WarehouseType:bigquery,snowflake,redshift,databricks,postgresWarehouseSyncMode:append,full,onetimeWarehouseSyncSchedule:hourly,daily,weeklyWarehouseSyncRunStatus:pending,running,completed,failedWarehouseSyncMappingType:events,profilesKey design decisions
Security
configEncrypted(AES-256-GCM) stores provider credentials — sameencrypt()/decrypt()as GSC connectordisplayIdentifier+displayEmailextracted at creation time as plain text — connection list never requires decryptionwarehouse_syncs(projectId, connectionId) → warehouse_connections(projectId, id)— blocks cross-tenant exploit at DB level (project A cannot reference project B's connection)name_nonempty_checkconstraint — DB-level guard against empty connection namesSync modes
append— cursor-based incremental viainsertTimecolumn;cursorOverlapMinutes(default 10) rewinds cursor to catch late-arriving rows; dedup handles re-ingested eventsfull— full reload every run + stale event cleanup via__op_sync_idstamp (handles BQ row deletions and updates)onetime— historical backfill, runs once then disables automaticallyOperational fields
cursorOverlapMinutes Int @default(10)— late-arriving row safety window for append modesyncDelayMinutes Int @default(0)— delay after cron fires before executing (allows upstream BQ pipelines to land)errorRetryCount+isErrorPaused— auto-pause circuit breakerfailureCount BigInt— tracks individually failed rows (bad data/type mismatch), separate fromrowCountbytesProcessed BigInt— tracks GCP billing units per runcreatedBy String?— userId of sync creator for avatar display in UIPerformance
warehouse_syncs(projectId),warehouse_syncs(connectionId),warehouse_sync_runs(syncId)Validation (
packages/validation/src/index.ts)zBigQueryWarehouseConfig— GCP project ID + region + SA JSON (validated for required fields)zWarehouseConfig— discriminated union ontype, ready for Snowflake armzBigQueryColumnMappingEvents— all event column fields:profileId,deviceId,eventId,eventName,eventNameStatic,timestamp,insertTime,revenue,jsonPropertieszBigQueryColumnMappingProfiles—profileIdColumn,firstName,lastName,email,avatar,createdAt,jsonPropertieszBigQuerySyncConfigwithsuperRefinecross-field rules:schedulerequired forappendandfullmodes, not foronetimeinsertTimerequired whensyncMode = 'append'with events mappingeventNameandeventNameStaticare mutually exclusivejsonPropertieson both mappings — maps a BQ JSON column whose keys are flattened into event/profile properties at ingest timeMigrations applied
20260610115042_warehouse_restructure20260610115043_warehouse_security_fksname_nonempty_check20260610115044_warehouse_phase1_finalizefailureCount+createdBy20260610115045_warehouse_onetime_modeonetimeenum value +schedulemade nullable20260610115046_warehouse_sync_overlap_delaycursorOverlapMinutes+syncDelayMinutesVerification
prisma migrate diff— no schema driftname_nonempty_checkblocks empty names at DB levelWhat comes next
isSyncDue(),syncDelayMinutesSummary by CodeRabbit
New Features
Bug Fixes