From 946823c7e959a21e2be8838c717c828625824cb9 Mon Sep 17 00:00:00 2001 From: Matin Gathani Date: Sun, 29 Mar 2026 17:43:03 -0700 Subject: [PATCH 1/2] fix(migration): update set_updated_at() trigger to write _updated_at Closes #91 --- .../sync-engine/src/database/migrations-embedded.ts | 2 +- .../src/database/migrations/0000_initial_migration.sql | 10 +--------- 2 files changed, 2 insertions(+), 10 deletions(-) diff --git a/packages/sync-engine/src/database/migrations-embedded.ts b/packages/sync-engine/src/database/migrations-embedded.ts index b238b72ce..98da505f8 100644 --- a/packages/sync-engine/src/database/migrations-embedded.ts +++ b/packages/sync-engine/src/database/migrations-embedded.ts @@ -9,6 +9,6 @@ export type EmbeddedMigration = { export const embeddedMigrations: EmbeddedMigration[] = [ { name: '0000_initial_migration.sql', - sql: '-- Internal sync metadata schema bootstrap for OpenAPI runtime.\n-- Schema-qualified objects use the explicit {{sync_schema}} placeholder.\n-- Uses idempotent DDL so it can be safely re-run.\n\nCREATE EXTENSION IF NOT EXISTS btree_gist;\n\nCREATE OR REPLACE FUNCTION set_updated_at() RETURNS trigger\n LANGUAGE plpgsql\nAS $$\nBEGIN\n -- Support both legacy "updated_at" and newer "_updated_at" columns.\n -- jsonb_populate_record silently ignores keys that are not present on NEW.\n NEW := jsonb_populate_record(\n NEW,\n jsonb_build_object(\n \'updated_at\', now(),\n \'_updated_at\', now()\n )\n );\n RETURN NEW;\nEND;\n$$;\n\nCREATE OR REPLACE FUNCTION set_updated_at_metadata() RETURNS trigger\n LANGUAGE plpgsql\nAS $$\nBEGIN\n NEW.updated_at = now();\n RETURN NEW;\nEND;\n$$;\n\nCREATE TABLE IF NOT EXISTS {{sync_schema}}."accounts" (\n "_raw_data" jsonb NOT NULL,\n "id" text GENERATED ALWAYS AS ((_raw_data->>\'id\')::text) STORED,\n "api_key_hashes" text[] NOT NULL DEFAULT \'{}\',\n "first_synced_at" timestamptz NOT NULL DEFAULT now(),\n "_last_synced_at" timestamptz NOT NULL DEFAULT now(),\n "_updated_at" timestamptz NOT NULL DEFAULT now(),\n PRIMARY KEY ("id")\n);\nCREATE INDEX IF NOT EXISTS "idx_accounts_api_key_hashes"\n ON {{sync_schema}}."accounts" USING GIN ("api_key_hashes");\nDROP TRIGGER IF EXISTS handle_updated_at ON {{sync_schema}}."accounts";\nCREATE TRIGGER handle_updated_at\nBEFORE UPDATE ON {{sync_schema}}."accounts"\nFOR EACH ROW EXECUTE FUNCTION set_updated_at();\n\nCREATE TABLE IF NOT EXISTS {{sync_schema}}."_managed_webhooks" (\n "id" text PRIMARY KEY,\n "object" text,\n "url" text NOT NULL,\n "enabled_events" jsonb NOT NULL,\n "description" text,\n "enabled" boolean,\n "livemode" boolean,\n "metadata" jsonb,\n "secret" text NOT NULL,\n "status" text,\n "api_version" text,\n "created" bigint,\n "last_synced_at" timestamptz,\n "updated_at" timestamptz NOT NULL DEFAULT now(),\n "account_id" text NOT NULL\n);\nALTER TABLE {{sync_schema}}."_managed_webhooks"\n DROP CONSTRAINT IF EXISTS "managed_webhooks_url_account_unique";\nALTER TABLE {{sync_schema}}."_managed_webhooks"\n ADD CONSTRAINT "managed_webhooks_url_account_unique" UNIQUE ("url", "account_id");\nALTER TABLE {{sync_schema}}."_managed_webhooks"\n DROP CONSTRAINT IF EXISTS "fk_managed_webhooks_account";\nALTER TABLE {{sync_schema}}."_managed_webhooks"\n ADD CONSTRAINT "fk_managed_webhooks_account"\n FOREIGN KEY ("account_id") REFERENCES {{sync_schema}}."accounts" (id);\nCREATE INDEX IF NOT EXISTS "idx_managed_webhooks_status"\n ON {{sync_schema}}."_managed_webhooks" ("status");\nCREATE INDEX IF NOT EXISTS "idx_managed_webhooks_enabled"\n ON {{sync_schema}}."_managed_webhooks" ("enabled");\nDROP TRIGGER IF EXISTS handle_updated_at ON {{sync_schema}}."_managed_webhooks";\nCREATE TRIGGER handle_updated_at\nBEFORE UPDATE ON {{sync_schema}}."_managed_webhooks"\nFOR EACH ROW EXECUTE FUNCTION set_updated_at_metadata();\n\nCREATE TABLE IF NOT EXISTS {{sync_schema}}."_sync_runs" (\n "_account_id" text NOT NULL,\n "started_at" timestamptz NOT NULL DEFAULT now(),\n "closed_at" timestamptz,\n "max_concurrent" integer NOT NULL DEFAULT 3,\n "triggered_by" text,\n "error_message" text,\n "updated_at" timestamptz NOT NULL DEFAULT now(),\n PRIMARY KEY ("_account_id", "started_at")\n);\nALTER TABLE {{sync_schema}}."_sync_runs"\n ADD COLUMN IF NOT EXISTS "error_message" text;\nALTER TABLE {{sync_schema}}."_sync_runs"\n DROP CONSTRAINT IF EXISTS "fk_sync_runs_account";\nALTER TABLE {{sync_schema}}."_sync_runs"\n ADD CONSTRAINT "fk_sync_runs_account"\n FOREIGN KEY ("_account_id") REFERENCES {{sync_schema}}."accounts" (id);\nALTER TABLE {{sync_schema}}."_sync_runs"\n DROP CONSTRAINT IF EXISTS one_active_run_per_account;\nALTER TABLE {{sync_schema}}."_sync_runs"\n DROP CONSTRAINT IF EXISTS one_active_run_per_account_triggered_by;\nALTER TABLE {{sync_schema}}."_sync_runs"\n ADD CONSTRAINT one_active_run_per_account_triggered_by\n EXCLUDE (\n "_account_id" WITH =,\n COALESCE(triggered_by, \'default\') WITH =\n ) WHERE (closed_at IS NULL);\nDROP TRIGGER IF EXISTS handle_updated_at ON {{sync_schema}}."_sync_runs";\nCREATE TRIGGER handle_updated_at\nBEFORE UPDATE ON {{sync_schema}}."_sync_runs"\nFOR EACH ROW EXECUTE FUNCTION set_updated_at_metadata();\nCREATE INDEX IF NOT EXISTS "idx_sync_runs_account_status"\n ON {{sync_schema}}."_sync_runs" ("_account_id", "closed_at");\n\nCREATE TABLE IF NOT EXISTS {{sync_schema}}."_sync_obj_runs" (\n "_account_id" text NOT NULL,\n "run_started_at" timestamptz NOT NULL,\n "object" text NOT NULL,\n "status" text NOT NULL DEFAULT \'pending\'\n CHECK (status IN (\'pending\', \'running\', \'complete\', \'error\')),\n "started_at" timestamptz,\n "completed_at" timestamptz,\n "processed_count" integer NOT NULL DEFAULT 0,\n "cursor" text,\n "page_cursor" text,\n "created_gte" integer NOT NULL DEFAULT 0,\n "created_lte" integer NOT NULL DEFAULT 0,\n "priority" integer NOT NULL DEFAULT 0,\n "error_message" text,\n "updated_at" timestamptz NOT NULL DEFAULT now(),\n PRIMARY KEY ("_account_id", "run_started_at", "object", "created_gte", "created_lte")\n);\nALTER TABLE {{sync_schema}}."_sync_obj_runs"\n ADD COLUMN IF NOT EXISTS "page_cursor" text;\nALTER TABLE {{sync_schema}}."_sync_obj_runs"\n ADD COLUMN IF NOT EXISTS "created_gte" integer NOT NULL DEFAULT 0;\nALTER TABLE {{sync_schema}}."_sync_obj_runs"\n ADD COLUMN IF NOT EXISTS "created_lte" integer NOT NULL DEFAULT 0;\nALTER TABLE {{sync_schema}}."_sync_obj_runs"\n ADD COLUMN IF NOT EXISTS "priority" integer NOT NULL DEFAULT 0;\nALTER TABLE {{sync_schema}}."_sync_obj_runs"\n ADD COLUMN IF NOT EXISTS "error_message" text;\nALTER TABLE {{sync_schema}}."_sync_obj_runs"\n DROP CONSTRAINT IF EXISTS "fk_sync_obj_runs_parent";\nALTER TABLE {{sync_schema}}."_sync_obj_runs"\n ADD CONSTRAINT "fk_sync_obj_runs_parent"\n FOREIGN KEY ("_account_id", "run_started_at")\n REFERENCES {{sync_schema}}."_sync_runs" ("_account_id", "started_at");\nDROP TRIGGER IF EXISTS handle_updated_at ON {{sync_schema}}."_sync_obj_runs";\nCREATE TRIGGER handle_updated_at\nBEFORE UPDATE ON {{sync_schema}}."_sync_obj_runs"\nFOR EACH ROW EXECUTE FUNCTION set_updated_at_metadata();\nCREATE INDEX IF NOT EXISTS "idx_sync_obj_runs_status"\n ON {{sync_schema}}."_sync_obj_runs" ("_account_id", "run_started_at", "status");\nCREATE INDEX IF NOT EXISTS "idx_sync_obj_runs_priority"\n ON {{sync_schema}}."_sync_obj_runs" ("_account_id", "run_started_at", "status", "priority");\n\nCREATE TABLE IF NOT EXISTS {{sync_schema}}."_rate_limits" (\n key TEXT PRIMARY KEY,\n count INTEGER NOT NULL DEFAULT 0,\n window_start TIMESTAMPTZ NOT NULL DEFAULT now()\n);\n\nCREATE OR REPLACE FUNCTION {{sync_schema}}.check_rate_limit(\n rate_key TEXT,\n max_requests INTEGER,\n window_seconds INTEGER\n)\nRETURNS VOID AS $$\nDECLARE\n now TIMESTAMPTZ := clock_timestamp();\n window_length INTERVAL := make_interval(secs => window_seconds);\n current_count INTEGER;\nBEGIN\n PERFORM pg_advisory_xact_lock(hashtext(rate_key));\n\n INSERT INTO {{sync_schema}}."_rate_limits" (key, count, window_start)\n VALUES (rate_key, 1, now)\n ON CONFLICT (key) DO UPDATE\n SET count = CASE\n WHEN "_rate_limits".window_start + window_length <= now\n THEN 1\n ELSE "_rate_limits".count + 1\n END,\n window_start = CASE\n WHEN "_rate_limits".window_start + window_length <= now\n THEN now\n ELSE "_rate_limits".window_start\n END;\n\n SELECT count INTO current_count FROM {{sync_schema}}."_rate_limits" WHERE key = rate_key;\n\n IF current_count > max_requests THEN\n RAISE EXCEPTION \'Rate limit exceeded for %\', rate_key;\n END IF;\nEND;\n$$ LANGUAGE plpgsql;\n\nCREATE OR REPLACE VIEW {{sync_schema}}."sync_runs" AS\nSELECT\n r._account_id as account_id,\n r.started_at,\n r.closed_at,\n r.triggered_by,\n r.max_concurrent,\n COALESCE(SUM(o.processed_count), 0) as total_processed,\n COUNT(o.*) as total_objects,\n COUNT(*) FILTER (WHERE o.status = \'complete\') as complete_count,\n COUNT(*) FILTER (WHERE o.status = \'error\') as error_count,\n COUNT(*) FILTER (WHERE o.status = \'running\') as running_count,\n COUNT(*) FILTER (WHERE o.status = \'pending\') as pending_count,\n STRING_AGG(o.error_message, \'; \') FILTER (WHERE o.error_message IS NOT NULL) as error_message,\n CASE\n WHEN r.closed_at IS NULL AND COUNT(*) FILTER (WHERE o.status = \'running\') > 0 THEN \'running\'\n WHEN r.closed_at IS NULL AND (COUNT(o.*) = 0 OR COUNT(o.*) = COUNT(*) FILTER (WHERE o.status = \'pending\')) THEN \'pending\'\n WHEN r.closed_at IS NULL THEN \'running\'\n WHEN COUNT(*) FILTER (WHERE o.status = \'error\') > 0 THEN \'error\'\n ELSE \'complete\'\n END as status\nFROM {{sync_schema}}."_sync_runs" r\nLEFT JOIN {{sync_schema}}."_sync_obj_runs" o\n ON o._account_id = r._account_id\n AND o.run_started_at = r.started_at\nGROUP BY r._account_id, r.started_at, r.closed_at, r.triggered_by, r.max_concurrent;\n\nDROP FUNCTION IF EXISTS {{sync_schema}}."sync_obj_progress"(TEXT, TIMESTAMPTZ);\nCREATE OR REPLACE VIEW {{sync_schema}}."sync_obj_progress" AS\nSELECT\n r."_account_id" AS account_id,\n r.run_started_at,\n r.object,\n ROUND(\n 100.0 * COUNT(*) FILTER (WHERE r.status = \'complete\') / NULLIF(COUNT(*), 0),\n 1\n ) AS pct_complete,\n COALESCE(SUM(r.processed_count), 0) AS processed\nFROM {{sync_schema}}."_sync_obj_runs" r\nWHERE r.run_started_at = (\n SELECT MAX(s.started_at)\n FROM {{sync_schema}}."_sync_runs" s\n WHERE s."_account_id" = r."_account_id"\n)\nGROUP BY r."_account_id", r.run_started_at, r.object;\n', + sql: '-- Internal sync metadata schema bootstrap for OpenAPI runtime.\n-- Schema-qualified objects use the explicit {{sync_schema}} placeholder.\n-- Uses idempotent DDL so it can be safely re-run.\n\nCREATE EXTENSION IF NOT EXISTS btree_gist;\n\nCREATE OR REPLACE FUNCTION set_updated_at() RETURNS trigger\n LANGUAGE plpgsql\nAS $$\nBEGIN\n NEW._updated_at = now();\n RETURN NEW;\nEND;\n$$;\n\nCREATE OR REPLACE FUNCTION set_updated_at_metadata() RETURNS trigger\n LANGUAGE plpgsql\nAS $$\nBEGIN\n NEW.updated_at = now();\n RETURN NEW;\nEND;\n$$;\n\nCREATE TABLE IF NOT EXISTS {{sync_schema}}."accounts" (\n "_raw_data" jsonb NOT NULL,\n "id" text GENERATED ALWAYS AS ((_raw_data->>\'id\')::text) STORED,\n "api_key_hashes" text[] NOT NULL DEFAULT \'{}\',\n "first_synced_at" timestamptz NOT NULL DEFAULT now(),\n "_last_synced_at" timestamptz NOT NULL DEFAULT now(),\n "_updated_at" timestamptz NOT NULL DEFAULT now(),\n PRIMARY KEY ("id")\n);\nCREATE INDEX IF NOT EXISTS "idx_accounts_api_key_hashes"\n ON {{sync_schema}}."accounts" USING GIN ("api_key_hashes");\nDROP TRIGGER IF EXISTS handle_updated_at ON {{sync_schema}}."accounts";\nCREATE TRIGGER handle_updated_at\nBEFORE UPDATE ON {{sync_schema}}."accounts"\nFOR EACH ROW EXECUTE FUNCTION set_updated_at();\n\nCREATE TABLE IF NOT EXISTS {{sync_schema}}."_managed_webhooks" (\n "id" text PRIMARY KEY,\n "object" text,\n "url" text NOT NULL,\n "enabled_events" jsonb NOT NULL,\n "description" text,\n "enabled" boolean,\n "livemode" boolean,\n "metadata" jsonb,\n "secret" text NOT NULL,\n "status" text,\n "api_version" text,\n "created" bigint,\n "last_synced_at" timestamptz,\n "updated_at" timestamptz NOT NULL DEFAULT now(),\n "account_id" text NOT NULL\n);\nALTER TABLE {{sync_schema}}."_managed_webhooks"\n DROP CONSTRAINT IF EXISTS "managed_webhooks_url_account_unique";\nALTER TABLE {{sync_schema}}."_managed_webhooks"\n ADD CONSTRAINT "managed_webhooks_url_account_unique" UNIQUE ("url", "account_id");\nALTER TABLE {{sync_schema}}."_managed_webhooks"\n DROP CONSTRAINT IF EXISTS "fk_managed_webhooks_account";\nALTER TABLE {{sync_schema}}."_managed_webhooks"\n ADD CONSTRAINT "fk_managed_webhooks_account"\n FOREIGN KEY ("account_id") REFERENCES {{sync_schema}}."accounts" (id);\nCREATE INDEX IF NOT EXISTS "idx_managed_webhooks_status"\n ON {{sync_schema}}."_managed_webhooks" ("status");\nCREATE INDEX IF NOT EXISTS "idx_managed_webhooks_enabled"\n ON {{sync_schema}}."_managed_webhooks" ("enabled");\nDROP TRIGGER IF EXISTS handle_updated_at ON {{sync_schema}}."_managed_webhooks";\nCREATE TRIGGER handle_updated_at\nBEFORE UPDATE ON {{sync_schema}}."_managed_webhooks"\nFOR EACH ROW EXECUTE FUNCTION set_updated_at_metadata();\n\nCREATE TABLE IF NOT EXISTS {{sync_schema}}."_sync_runs" (\n "_account_id" text NOT NULL,\n "started_at" timestamptz NOT NULL DEFAULT now(),\n "closed_at" timestamptz,\n "max_concurrent" integer NOT NULL DEFAULT 3,\n "triggered_by" text,\n "error_message" text,\n "updated_at" timestamptz NOT NULL DEFAULT now(),\n PRIMARY KEY ("_account_id", "started_at")\n);\nALTER TABLE {{sync_schema}}."_sync_runs"\n ADD COLUMN IF NOT EXISTS "error_message" text;\nALTER TABLE {{sync_schema}}."_sync_runs"\n DROP CONSTRAINT IF EXISTS "fk_sync_runs_account";\nALTER TABLE {{sync_schema}}."_sync_runs"\n ADD CONSTRAINT "fk_sync_runs_account"\n FOREIGN KEY ("_account_id") REFERENCES {{sync_schema}}."accounts" (id);\nALTER TABLE {{sync_schema}}."_sync_runs"\n DROP CONSTRAINT IF EXISTS one_active_run_per_account;\nALTER TABLE {{sync_schema}}."_sync_runs"\n DROP CONSTRAINT IF EXISTS one_active_run_per_account_triggered_by;\nALTER TABLE {{sync_schema}}."_sync_runs"\n ADD CONSTRAINT one_active_run_per_account_triggered_by\n EXCLUDE (\n "_account_id" WITH =,\n COALESCE(triggered_by, \'default\') WITH =\n ) WHERE (closed_at IS NULL);\nDROP TRIGGER IF EXISTS handle_updated_at ON {{sync_schema}}."_sync_runs";\nCREATE TRIGGER handle_updated_at\nBEFORE UPDATE ON {{sync_schema}}."_sync_runs"\nFOR EACH ROW EXECUTE FUNCTION set_updated_at_metadata();\nCREATE INDEX IF NOT EXISTS "idx_sync_runs_account_status"\n ON {{sync_schema}}."_sync_runs" ("_account_id", "closed_at");\n\nCREATE TABLE IF NOT EXISTS {{sync_schema}}."_sync_obj_runs" (\n "_account_id" text NOT NULL,\n "run_started_at" timestamptz NOT NULL,\n "object" text NOT NULL,\n "status" text NOT NULL DEFAULT \'pending\'\n CHECK (status IN (\'pending\', \'running\', \'complete\', \'error\')),\n "started_at" timestamptz,\n "completed_at" timestamptz,\n "processed_count" integer NOT NULL DEFAULT 0,\n "cursor" text,\n "page_cursor" text,\n "created_gte" integer NOT NULL DEFAULT 0,\n "created_lte" integer NOT NULL DEFAULT 0,\n "priority" integer NOT NULL DEFAULT 0,\n "error_message" text,\n "updated_at" timestamptz NOT NULL DEFAULT now(),\n PRIMARY KEY ("_account_id", "run_started_at", "object", "created_gte", "created_lte")\n);\nALTER TABLE {{sync_schema}}."_sync_obj_runs"\n ADD COLUMN IF NOT EXISTS "page_cursor" text;\nALTER TABLE {{sync_schema}}."_sync_obj_runs"\n ADD COLUMN IF NOT EXISTS "created_gte" integer NOT NULL DEFAULT 0;\nALTER TABLE {{sync_schema}}."_sync_obj_runs"\n ADD COLUMN IF NOT EXISTS "created_lte" integer NOT NULL DEFAULT 0;\nALTER TABLE {{sync_schema}}."_sync_obj_runs"\n ADD COLUMN IF NOT EXISTS "priority" integer NOT NULL DEFAULT 0;\nALTER TABLE {{sync_schema}}."_sync_obj_runs"\n ADD COLUMN IF NOT EXISTS "error_message" text;\nALTER TABLE {{sync_schema}}."_sync_obj_runs"\n DROP CONSTRAINT IF EXISTS "fk_sync_obj_runs_parent";\nALTER TABLE {{sync_schema}}."_sync_obj_runs"\n ADD CONSTRAINT "fk_sync_obj_runs_parent"\n FOREIGN KEY ("_account_id", "run_started_at")\n REFERENCES {{sync_schema}}."_sync_runs" ("_account_id", "started_at");\nDROP TRIGGER IF EXISTS handle_updated_at ON {{sync_schema}}."_sync_obj_runs";\nCREATE TRIGGER handle_updated_at\nBEFORE UPDATE ON {{sync_schema}}."_sync_obj_runs"\nFOR EACH ROW EXECUTE FUNCTION set_updated_at_metadata();\nCREATE INDEX IF NOT EXISTS "idx_sync_obj_runs_status"\n ON {{sync_schema}}."_sync_obj_runs" ("_account_id", "run_started_at", "status");\nCREATE INDEX IF NOT EXISTS "idx_sync_obj_runs_priority"\n ON {{sync_schema}}."_sync_obj_runs" ("_account_id", "run_started_at", "status", "priority");\n\nCREATE TABLE IF NOT EXISTS {{sync_schema}}."_rate_limits" (\n key TEXT PRIMARY KEY,\n count INTEGER NOT NULL DEFAULT 0,\n window_start TIMESTAMPTZ NOT NULL DEFAULT now()\n);\n\nCREATE OR REPLACE FUNCTION {{sync_schema}}.check_rate_limit(\n rate_key TEXT,\n max_requests INTEGER,\n window_seconds INTEGER\n)\nRETURNS VOID AS $$\nDECLARE\n now TIMESTAMPTZ := clock_timestamp();\n window_length INTERVAL := make_interval(secs => window_seconds);\n current_count INTEGER;\nBEGIN\n PERFORM pg_advisory_xact_lock(hashtext(rate_key));\n\n INSERT INTO {{sync_schema}}."_rate_limits" (key, count, window_start)\n VALUES (rate_key, 1, now)\n ON CONFLICT (key) DO UPDATE\n SET count = CASE\n WHEN "_rate_limits".window_start + window_length <= now\n THEN 1\n ELSE "_rate_limits".count + 1\n END,\n window_start = CASE\n WHEN "_rate_limits".window_start + window_length <= now\n THEN now\n ELSE "_rate_limits".window_start\n END;\n\n SELECT count INTO current_count FROM {{sync_schema}}."_rate_limits" WHERE key = rate_key;\n\n IF current_count > max_requests THEN\n RAISE EXCEPTION \'Rate limit exceeded for %\', rate_key;\n END IF;\nEND;\n$$ LANGUAGE plpgsql;\n\nCREATE OR REPLACE VIEW {{sync_schema}}."sync_runs" AS\nSELECT\n r._account_id as account_id,\n r.started_at,\n r.closed_at,\n r.triggered_by,\n r.max_concurrent,\n COALESCE(SUM(o.processed_count), 0) as total_processed,\n COUNT(o.*) as total_objects,\n COUNT(*) FILTER (WHERE o.status = \'complete\') as complete_count,\n COUNT(*) FILTER (WHERE o.status = \'error\') as error_count,\n COUNT(*) FILTER (WHERE o.status = \'running\') as running_count,\n COUNT(*) FILTER (WHERE o.status = \'pending\') as pending_count,\n STRING_AGG(o.error_message, \'; \') FILTER (WHERE o.error_message IS NOT NULL) as error_message,\n CASE\n WHEN r.closed_at IS NULL AND COUNT(*) FILTER (WHERE o.status = \'running\') > 0 THEN \'running\'\n WHEN r.closed_at IS NULL AND (COUNT(o.*) = 0 OR COUNT(o.*) = COUNT(*) FILTER (WHERE o.status = \'pending\')) THEN \'pending\'\n WHEN r.closed_at IS NULL THEN \'running\'\n WHEN COUNT(*) FILTER (WHERE o.status = \'error\') > 0 THEN \'error\'\n ELSE \'complete\'\n END as status\nFROM {{sync_schema}}."_sync_runs" r\nLEFT JOIN {{sync_schema}}."_sync_obj_runs" o\n ON o._account_id = r._account_id\n AND o.run_started_at = r.started_at\nGROUP BY r._account_id, r.started_at, r.closed_at, r.triggered_by, r.max_concurrent;\n\nDROP FUNCTION IF EXISTS {{sync_schema}}."sync_obj_progress"(TEXT, TIMESTAMPTZ);\nCREATE OR REPLACE VIEW {{sync_schema}}."sync_obj_progress" AS\nSELECT\n r."_account_id" AS account_id,\n r.run_started_at,\n r.object,\n ROUND(\n 100.0 * COUNT(*) FILTER (WHERE r.status = \'complete\') / NULLIF(COUNT(*), 0),\n 1\n ) AS pct_complete,\n COALESCE(SUM(r.processed_count), 0) AS processed\nFROM {{sync_schema}}."_sync_obj_runs" r\nWHERE r.run_started_at = (\n SELECT MAX(s.started_at)\n FROM {{sync_schema}}."_sync_runs" s\n WHERE s."_account_id" = r."_account_id"\n)\nGROUP BY r."_account_id", r.run_started_at, r.object;\n', }, ] diff --git a/packages/sync-engine/src/database/migrations/0000_initial_migration.sql b/packages/sync-engine/src/database/migrations/0000_initial_migration.sql index 57ab2d944..bf1f707bf 100644 --- a/packages/sync-engine/src/database/migrations/0000_initial_migration.sql +++ b/packages/sync-engine/src/database/migrations/0000_initial_migration.sql @@ -8,15 +8,7 @@ CREATE OR REPLACE FUNCTION set_updated_at() RETURNS trigger LANGUAGE plpgsql AS $$ BEGIN - -- Support both legacy "updated_at" and newer "_updated_at" columns. - -- jsonb_populate_record silently ignores keys that are not present on NEW. - NEW := jsonb_populate_record( - NEW, - jsonb_build_object( - 'updated_at', now(), - '_updated_at', now() - ) - ); + NEW._updated_at = now(); RETURN NEW; END; $$; From 8e9f2272f6f865065c0628ba59bf77558f6bbb02 Mon Sep 17 00:00:00 2001 From: Matin Gathani Date: Thu, 2 Apr 2026 15:36:43 -0700 Subject: [PATCH 2/2] fix(migration): add follow-up migration for set_updated_at --- .../src/database/migrations-embedded.ts | 6 +++- .../migrations/0000_initial_migration.sql | 10 ++++++- .../migrations/0001_fix_set_updated_at.sql | 8 +++++ .../postgres-sync-observability.test.ts | 29 ++++++++++++++++++- 4 files changed, 50 insertions(+), 3 deletions(-) create mode 100644 packages/sync-engine/src/database/migrations/0001_fix_set_updated_at.sql diff --git a/packages/sync-engine/src/database/migrations-embedded.ts b/packages/sync-engine/src/database/migrations-embedded.ts index 98da505f8..5c4a1565f 100644 --- a/packages/sync-engine/src/database/migrations-embedded.ts +++ b/packages/sync-engine/src/database/migrations-embedded.ts @@ -9,6 +9,10 @@ export type EmbeddedMigration = { export const embeddedMigrations: EmbeddedMigration[] = [ { name: '0000_initial_migration.sql', - sql: '-- Internal sync metadata schema bootstrap for OpenAPI runtime.\n-- Schema-qualified objects use the explicit {{sync_schema}} placeholder.\n-- Uses idempotent DDL so it can be safely re-run.\n\nCREATE EXTENSION IF NOT EXISTS btree_gist;\n\nCREATE OR REPLACE FUNCTION set_updated_at() RETURNS trigger\n LANGUAGE plpgsql\nAS $$\nBEGIN\n NEW._updated_at = now();\n RETURN NEW;\nEND;\n$$;\n\nCREATE OR REPLACE FUNCTION set_updated_at_metadata() RETURNS trigger\n LANGUAGE plpgsql\nAS $$\nBEGIN\n NEW.updated_at = now();\n RETURN NEW;\nEND;\n$$;\n\nCREATE TABLE IF NOT EXISTS {{sync_schema}}."accounts" (\n "_raw_data" jsonb NOT NULL,\n "id" text GENERATED ALWAYS AS ((_raw_data->>\'id\')::text) STORED,\n "api_key_hashes" text[] NOT NULL DEFAULT \'{}\',\n "first_synced_at" timestamptz NOT NULL DEFAULT now(),\n "_last_synced_at" timestamptz NOT NULL DEFAULT now(),\n "_updated_at" timestamptz NOT NULL DEFAULT now(),\n PRIMARY KEY ("id")\n);\nCREATE INDEX IF NOT EXISTS "idx_accounts_api_key_hashes"\n ON {{sync_schema}}."accounts" USING GIN ("api_key_hashes");\nDROP TRIGGER IF EXISTS handle_updated_at ON {{sync_schema}}."accounts";\nCREATE TRIGGER handle_updated_at\nBEFORE UPDATE ON {{sync_schema}}."accounts"\nFOR EACH ROW EXECUTE FUNCTION set_updated_at();\n\nCREATE TABLE IF NOT EXISTS {{sync_schema}}."_managed_webhooks" (\n "id" text PRIMARY KEY,\n "object" text,\n "url" text NOT NULL,\n "enabled_events" jsonb NOT NULL,\n "description" text,\n "enabled" boolean,\n "livemode" boolean,\n "metadata" jsonb,\n "secret" text NOT NULL,\n "status" text,\n "api_version" text,\n "created" bigint,\n "last_synced_at" timestamptz,\n "updated_at" timestamptz NOT NULL DEFAULT now(),\n "account_id" text NOT NULL\n);\nALTER TABLE {{sync_schema}}."_managed_webhooks"\n DROP CONSTRAINT IF EXISTS "managed_webhooks_url_account_unique";\nALTER TABLE {{sync_schema}}."_managed_webhooks"\n ADD CONSTRAINT "managed_webhooks_url_account_unique" UNIQUE ("url", "account_id");\nALTER TABLE {{sync_schema}}."_managed_webhooks"\n DROP CONSTRAINT IF EXISTS "fk_managed_webhooks_account";\nALTER TABLE {{sync_schema}}."_managed_webhooks"\n ADD CONSTRAINT "fk_managed_webhooks_account"\n FOREIGN KEY ("account_id") REFERENCES {{sync_schema}}."accounts" (id);\nCREATE INDEX IF NOT EXISTS "idx_managed_webhooks_status"\n ON {{sync_schema}}."_managed_webhooks" ("status");\nCREATE INDEX IF NOT EXISTS "idx_managed_webhooks_enabled"\n ON {{sync_schema}}."_managed_webhooks" ("enabled");\nDROP TRIGGER IF EXISTS handle_updated_at ON {{sync_schema}}."_managed_webhooks";\nCREATE TRIGGER handle_updated_at\nBEFORE UPDATE ON {{sync_schema}}."_managed_webhooks"\nFOR EACH ROW EXECUTE FUNCTION set_updated_at_metadata();\n\nCREATE TABLE IF NOT EXISTS {{sync_schema}}."_sync_runs" (\n "_account_id" text NOT NULL,\n "started_at" timestamptz NOT NULL DEFAULT now(),\n "closed_at" timestamptz,\n "max_concurrent" integer NOT NULL DEFAULT 3,\n "triggered_by" text,\n "error_message" text,\n "updated_at" timestamptz NOT NULL DEFAULT now(),\n PRIMARY KEY ("_account_id", "started_at")\n);\nALTER TABLE {{sync_schema}}."_sync_runs"\n ADD COLUMN IF NOT EXISTS "error_message" text;\nALTER TABLE {{sync_schema}}."_sync_runs"\n DROP CONSTRAINT IF EXISTS "fk_sync_runs_account";\nALTER TABLE {{sync_schema}}."_sync_runs"\n ADD CONSTRAINT "fk_sync_runs_account"\n FOREIGN KEY ("_account_id") REFERENCES {{sync_schema}}."accounts" (id);\nALTER TABLE {{sync_schema}}."_sync_runs"\n DROP CONSTRAINT IF EXISTS one_active_run_per_account;\nALTER TABLE {{sync_schema}}."_sync_runs"\n DROP CONSTRAINT IF EXISTS one_active_run_per_account_triggered_by;\nALTER TABLE {{sync_schema}}."_sync_runs"\n ADD CONSTRAINT one_active_run_per_account_triggered_by\n EXCLUDE (\n "_account_id" WITH =,\n COALESCE(triggered_by, \'default\') WITH =\n ) WHERE (closed_at IS NULL);\nDROP TRIGGER IF EXISTS handle_updated_at ON {{sync_schema}}."_sync_runs";\nCREATE TRIGGER handle_updated_at\nBEFORE UPDATE ON {{sync_schema}}."_sync_runs"\nFOR EACH ROW EXECUTE FUNCTION set_updated_at_metadata();\nCREATE INDEX IF NOT EXISTS "idx_sync_runs_account_status"\n ON {{sync_schema}}."_sync_runs" ("_account_id", "closed_at");\n\nCREATE TABLE IF NOT EXISTS {{sync_schema}}."_sync_obj_runs" (\n "_account_id" text NOT NULL,\n "run_started_at" timestamptz NOT NULL,\n "object" text NOT NULL,\n "status" text NOT NULL DEFAULT \'pending\'\n CHECK (status IN (\'pending\', \'running\', \'complete\', \'error\')),\n "started_at" timestamptz,\n "completed_at" timestamptz,\n "processed_count" integer NOT NULL DEFAULT 0,\n "cursor" text,\n "page_cursor" text,\n "created_gte" integer NOT NULL DEFAULT 0,\n "created_lte" integer NOT NULL DEFAULT 0,\n "priority" integer NOT NULL DEFAULT 0,\n "error_message" text,\n "updated_at" timestamptz NOT NULL DEFAULT now(),\n PRIMARY KEY ("_account_id", "run_started_at", "object", "created_gte", "created_lte")\n);\nALTER TABLE {{sync_schema}}."_sync_obj_runs"\n ADD COLUMN IF NOT EXISTS "page_cursor" text;\nALTER TABLE {{sync_schema}}."_sync_obj_runs"\n ADD COLUMN IF NOT EXISTS "created_gte" integer NOT NULL DEFAULT 0;\nALTER TABLE {{sync_schema}}."_sync_obj_runs"\n ADD COLUMN IF NOT EXISTS "created_lte" integer NOT NULL DEFAULT 0;\nALTER TABLE {{sync_schema}}."_sync_obj_runs"\n ADD COLUMN IF NOT EXISTS "priority" integer NOT NULL DEFAULT 0;\nALTER TABLE {{sync_schema}}."_sync_obj_runs"\n ADD COLUMN IF NOT EXISTS "error_message" text;\nALTER TABLE {{sync_schema}}."_sync_obj_runs"\n DROP CONSTRAINT IF EXISTS "fk_sync_obj_runs_parent";\nALTER TABLE {{sync_schema}}."_sync_obj_runs"\n ADD CONSTRAINT "fk_sync_obj_runs_parent"\n FOREIGN KEY ("_account_id", "run_started_at")\n REFERENCES {{sync_schema}}."_sync_runs" ("_account_id", "started_at");\nDROP TRIGGER IF EXISTS handle_updated_at ON {{sync_schema}}."_sync_obj_runs";\nCREATE TRIGGER handle_updated_at\nBEFORE UPDATE ON {{sync_schema}}."_sync_obj_runs"\nFOR EACH ROW EXECUTE FUNCTION set_updated_at_metadata();\nCREATE INDEX IF NOT EXISTS "idx_sync_obj_runs_status"\n ON {{sync_schema}}."_sync_obj_runs" ("_account_id", "run_started_at", "status");\nCREATE INDEX IF NOT EXISTS "idx_sync_obj_runs_priority"\n ON {{sync_schema}}."_sync_obj_runs" ("_account_id", "run_started_at", "status", "priority");\n\nCREATE TABLE IF NOT EXISTS {{sync_schema}}."_rate_limits" (\n key TEXT PRIMARY KEY,\n count INTEGER NOT NULL DEFAULT 0,\n window_start TIMESTAMPTZ NOT NULL DEFAULT now()\n);\n\nCREATE OR REPLACE FUNCTION {{sync_schema}}.check_rate_limit(\n rate_key TEXT,\n max_requests INTEGER,\n window_seconds INTEGER\n)\nRETURNS VOID AS $$\nDECLARE\n now TIMESTAMPTZ := clock_timestamp();\n window_length INTERVAL := make_interval(secs => window_seconds);\n current_count INTEGER;\nBEGIN\n PERFORM pg_advisory_xact_lock(hashtext(rate_key));\n\n INSERT INTO {{sync_schema}}."_rate_limits" (key, count, window_start)\n VALUES (rate_key, 1, now)\n ON CONFLICT (key) DO UPDATE\n SET count = CASE\n WHEN "_rate_limits".window_start + window_length <= now\n THEN 1\n ELSE "_rate_limits".count + 1\n END,\n window_start = CASE\n WHEN "_rate_limits".window_start + window_length <= now\n THEN now\n ELSE "_rate_limits".window_start\n END;\n\n SELECT count INTO current_count FROM {{sync_schema}}."_rate_limits" WHERE key = rate_key;\n\n IF current_count > max_requests THEN\n RAISE EXCEPTION \'Rate limit exceeded for %\', rate_key;\n END IF;\nEND;\n$$ LANGUAGE plpgsql;\n\nCREATE OR REPLACE VIEW {{sync_schema}}."sync_runs" AS\nSELECT\n r._account_id as account_id,\n r.started_at,\n r.closed_at,\n r.triggered_by,\n r.max_concurrent,\n COALESCE(SUM(o.processed_count), 0) as total_processed,\n COUNT(o.*) as total_objects,\n COUNT(*) FILTER (WHERE o.status = \'complete\') as complete_count,\n COUNT(*) FILTER (WHERE o.status = \'error\') as error_count,\n COUNT(*) FILTER (WHERE o.status = \'running\') as running_count,\n COUNT(*) FILTER (WHERE o.status = \'pending\') as pending_count,\n STRING_AGG(o.error_message, \'; \') FILTER (WHERE o.error_message IS NOT NULL) as error_message,\n CASE\n WHEN r.closed_at IS NULL AND COUNT(*) FILTER (WHERE o.status = \'running\') > 0 THEN \'running\'\n WHEN r.closed_at IS NULL AND (COUNT(o.*) = 0 OR COUNT(o.*) = COUNT(*) FILTER (WHERE o.status = \'pending\')) THEN \'pending\'\n WHEN r.closed_at IS NULL THEN \'running\'\n WHEN COUNT(*) FILTER (WHERE o.status = \'error\') > 0 THEN \'error\'\n ELSE \'complete\'\n END as status\nFROM {{sync_schema}}."_sync_runs" r\nLEFT JOIN {{sync_schema}}."_sync_obj_runs" o\n ON o._account_id = r._account_id\n AND o.run_started_at = r.started_at\nGROUP BY r._account_id, r.started_at, r.closed_at, r.triggered_by, r.max_concurrent;\n\nDROP FUNCTION IF EXISTS {{sync_schema}}."sync_obj_progress"(TEXT, TIMESTAMPTZ);\nCREATE OR REPLACE VIEW {{sync_schema}}."sync_obj_progress" AS\nSELECT\n r."_account_id" AS account_id,\n r.run_started_at,\n r.object,\n ROUND(\n 100.0 * COUNT(*) FILTER (WHERE r.status = \'complete\') / NULLIF(COUNT(*), 0),\n 1\n ) AS pct_complete,\n COALESCE(SUM(r.processed_count), 0) AS processed\nFROM {{sync_schema}}."_sync_obj_runs" r\nWHERE r.run_started_at = (\n SELECT MAX(s.started_at)\n FROM {{sync_schema}}."_sync_runs" s\n WHERE s."_account_id" = r."_account_id"\n)\nGROUP BY r."_account_id", r.run_started_at, r.object;\n', + sql: '-- Internal sync metadata schema bootstrap for OpenAPI runtime.\n-- Schema-qualified objects use the explicit {{sync_schema}} placeholder.\n-- Uses idempotent DDL so it can be safely re-run.\n\nCREATE EXTENSION IF NOT EXISTS btree_gist;\n\nCREATE OR REPLACE FUNCTION set_updated_at() RETURNS trigger\n LANGUAGE plpgsql\nAS $$\nBEGIN\n -- Support both legacy "updated_at" and newer "_updated_at" columns.\n -- jsonb_populate_record silently ignores keys that are not present on NEW.\n NEW := jsonb_populate_record(\n NEW,\n jsonb_build_object(\n \'updated_at\', now(),\n \'_updated_at\', now()\n )\n );\n RETURN NEW;\nEND;\n$$;\n\nCREATE OR REPLACE FUNCTION set_updated_at_metadata() RETURNS trigger\n LANGUAGE plpgsql\nAS $$\nBEGIN\n NEW.updated_at = now();\n RETURN NEW;\nEND;\n$$;\n\nCREATE TABLE IF NOT EXISTS {{sync_schema}}."accounts" (\n "_raw_data" jsonb NOT NULL,\n "id" text GENERATED ALWAYS AS ((_raw_data->>\'id\')::text) STORED,\n "api_key_hashes" text[] NOT NULL DEFAULT \'{}\',\n "first_synced_at" timestamptz NOT NULL DEFAULT now(),\n "_last_synced_at" timestamptz NOT NULL DEFAULT now(),\n "_updated_at" timestamptz NOT NULL DEFAULT now(),\n PRIMARY KEY ("id")\n);\nCREATE INDEX IF NOT EXISTS "idx_accounts_api_key_hashes"\n ON {{sync_schema}}."accounts" USING GIN ("api_key_hashes");\nDROP TRIGGER IF EXISTS handle_updated_at ON {{sync_schema}}."accounts";\nCREATE TRIGGER handle_updated_at\nBEFORE UPDATE ON {{sync_schema}}."accounts"\nFOR EACH ROW EXECUTE FUNCTION set_updated_at();\n\nCREATE TABLE IF NOT EXISTS {{sync_schema}}."_managed_webhooks" (\n "id" text PRIMARY KEY,\n "object" text,\n "url" text NOT NULL,\n "enabled_events" jsonb NOT NULL,\n "description" text,\n "enabled" boolean,\n "livemode" boolean,\n "metadata" jsonb,\n "secret" text NOT NULL,\n "status" text,\n "api_version" text,\n "created" bigint,\n "last_synced_at" timestamptz,\n "updated_at" timestamptz NOT NULL DEFAULT now(),\n "account_id" text NOT NULL\n);\nALTER TABLE {{sync_schema}}."_managed_webhooks"\n DROP CONSTRAINT IF EXISTS "managed_webhooks_url_account_unique";\nALTER TABLE {{sync_schema}}."_managed_webhooks"\n ADD CONSTRAINT "managed_webhooks_url_account_unique" UNIQUE ("url", "account_id");\nALTER TABLE {{sync_schema}}."_managed_webhooks"\n DROP CONSTRAINT IF EXISTS "fk_managed_webhooks_account";\nALTER TABLE {{sync_schema}}."_managed_webhooks"\n ADD CONSTRAINT "fk_managed_webhooks_account"\n FOREIGN KEY ("account_id") REFERENCES {{sync_schema}}."accounts" (id);\nCREATE INDEX IF NOT EXISTS "idx_managed_webhooks_status"\n ON {{sync_schema}}."_managed_webhooks" ("status");\nCREATE INDEX IF NOT EXISTS "idx_managed_webhooks_enabled"\n ON {{sync_schema}}."_managed_webhooks" ("enabled");\nDROP TRIGGER IF EXISTS handle_updated_at ON {{sync_schema}}."_managed_webhooks";\nCREATE TRIGGER handle_updated_at\nBEFORE UPDATE ON {{sync_schema}}."_managed_webhooks"\nFOR EACH ROW EXECUTE FUNCTION set_updated_at_metadata();\n\nCREATE TABLE IF NOT EXISTS {{sync_schema}}."_sync_runs" (\n "_account_id" text NOT NULL,\n "started_at" timestamptz NOT NULL DEFAULT now(),\n "closed_at" timestamptz,\n "max_concurrent" integer NOT NULL DEFAULT 3,\n "triggered_by" text,\n "error_message" text,\n "updated_at" timestamptz NOT NULL DEFAULT now(),\n PRIMARY KEY ("_account_id", "started_at")\n);\nALTER TABLE {{sync_schema}}."_sync_runs"\n ADD COLUMN IF NOT EXISTS "error_message" text;\nALTER TABLE {{sync_schema}}."_sync_runs"\n DROP CONSTRAINT IF EXISTS "fk_sync_runs_account";\nALTER TABLE {{sync_schema}}."_sync_runs"\n ADD CONSTRAINT "fk_sync_runs_account"\n FOREIGN KEY ("_account_id") REFERENCES {{sync_schema}}."accounts" (id);\nALTER TABLE {{sync_schema}}."_sync_runs"\n DROP CONSTRAINT IF EXISTS one_active_run_per_account;\nALTER TABLE {{sync_schema}}."_sync_runs"\n DROP CONSTRAINT IF EXISTS one_active_run_per_account_triggered_by;\nALTER TABLE {{sync_schema}}."_sync_runs"\n ADD CONSTRAINT one_active_run_per_account_triggered_by\n EXCLUDE (\n "_account_id" WITH =,\n COALESCE(triggered_by, \'default\') WITH =\n ) WHERE (closed_at IS NULL);\nDROP TRIGGER IF EXISTS handle_updated_at ON {{sync_schema}}."_sync_runs";\nCREATE TRIGGER handle_updated_at\nBEFORE UPDATE ON {{sync_schema}}."_sync_runs"\nFOR EACH ROW EXECUTE FUNCTION set_updated_at_metadata();\nCREATE INDEX IF NOT EXISTS "idx_sync_runs_account_status"\n ON {{sync_schema}}."_sync_runs" ("_account_id", "closed_at");\n\nCREATE TABLE IF NOT EXISTS {{sync_schema}}."_sync_obj_runs" (\n "_account_id" text NOT NULL,\n "run_started_at" timestamptz NOT NULL,\n "object" text NOT NULL,\n "status" text NOT NULL DEFAULT \'pending\'\n CHECK (status IN (\'pending\', \'running\', \'complete\', \'error\')),\n "started_at" timestamptz,\n "completed_at" timestamptz,\n "processed_count" integer NOT NULL DEFAULT 0,\n "cursor" text,\n "page_cursor" text,\n "created_gte" integer NOT NULL DEFAULT 0,\n "created_lte" integer NOT NULL DEFAULT 0,\n "priority" integer NOT NULL DEFAULT 0,\n "error_message" text,\n "updated_at" timestamptz NOT NULL DEFAULT now(),\n PRIMARY KEY ("_account_id", "run_started_at", "object", "created_gte", "created_lte")\n);\nALTER TABLE {{sync_schema}}."_sync_obj_runs"\n ADD COLUMN IF NOT EXISTS "page_cursor" text;\nALTER TABLE {{sync_schema}}."_sync_obj_runs"\n ADD COLUMN IF NOT EXISTS "created_gte" integer NOT NULL DEFAULT 0;\nALTER TABLE {{sync_schema}}."_sync_obj_runs"\n ADD COLUMN IF NOT EXISTS "created_lte" integer NOT NULL DEFAULT 0;\nALTER TABLE {{sync_schema}}."_sync_obj_runs"\n ADD COLUMN IF NOT EXISTS "priority" integer NOT NULL DEFAULT 0;\nALTER TABLE {{sync_schema}}."_sync_obj_runs"\n ADD COLUMN IF NOT EXISTS "error_message" text;\nALTER TABLE {{sync_schema}}."_sync_obj_runs"\n DROP CONSTRAINT IF EXISTS "fk_sync_obj_runs_parent";\nALTER TABLE {{sync_schema}}."_sync_obj_runs"\n ADD CONSTRAINT "fk_sync_obj_runs_parent"\n FOREIGN KEY ("_account_id", "run_started_at")\n REFERENCES {{sync_schema}}."_sync_runs" ("_account_id", "started_at");\nDROP TRIGGER IF EXISTS handle_updated_at ON {{sync_schema}}."_sync_obj_runs";\nCREATE TRIGGER handle_updated_at\nBEFORE UPDATE ON {{sync_schema}}."_sync_obj_runs"\nFOR EACH ROW EXECUTE FUNCTION set_updated_at_metadata();\nCREATE INDEX IF NOT EXISTS "idx_sync_obj_runs_status"\n ON {{sync_schema}}."_sync_obj_runs" ("_account_id", "run_started_at", "status");\nCREATE INDEX IF NOT EXISTS "idx_sync_obj_runs_priority"\n ON {{sync_schema}}."_sync_obj_runs" ("_account_id", "run_started_at", "status", "priority");\n\nCREATE TABLE IF NOT EXISTS {{sync_schema}}."_rate_limits" (\n key TEXT PRIMARY KEY,\n count INTEGER NOT NULL DEFAULT 0,\n window_start TIMESTAMPTZ NOT NULL DEFAULT now()\n);\n\nCREATE OR REPLACE FUNCTION {{sync_schema}}.check_rate_limit(\n rate_key TEXT,\n max_requests INTEGER,\n window_seconds INTEGER\n)\nRETURNS VOID AS $$\nDECLARE\n now TIMESTAMPTZ := clock_timestamp();\n window_length INTERVAL := make_interval(secs => window_seconds);\n current_count INTEGER;\nBEGIN\n PERFORM pg_advisory_xact_lock(hashtext(rate_key));\n\n INSERT INTO {{sync_schema}}."_rate_limits" (key, count, window_start)\n VALUES (rate_key, 1, now)\n ON CONFLICT (key) DO UPDATE\n SET count = CASE\n WHEN "_rate_limits".window_start + window_length <= now\n THEN 1\n ELSE "_rate_limits".count + 1\n END,\n window_start = CASE\n WHEN "_rate_limits".window_start + window_length <= now\n THEN now\n ELSE "_rate_limits".window_start\n END;\n\n SELECT count INTO current_count FROM {{sync_schema}}."_rate_limits" WHERE key = rate_key;\n\n IF current_count > max_requests THEN\n RAISE EXCEPTION \'Rate limit exceeded for %\', rate_key;\n END IF;\nEND;\n$$ LANGUAGE plpgsql;\n\nCREATE OR REPLACE VIEW {{sync_schema}}."sync_runs" AS\nSELECT\n r._account_id as account_id,\n r.started_at,\n r.closed_at,\n r.triggered_by,\n r.max_concurrent,\n COALESCE(SUM(o.processed_count), 0) as total_processed,\n COUNT(o.*) as total_objects,\n COUNT(*) FILTER (WHERE o.status = \'complete\') as complete_count,\n COUNT(*) FILTER (WHERE o.status = \'error\') as error_count,\n COUNT(*) FILTER (WHERE o.status = \'running\') as running_count,\n COUNT(*) FILTER (WHERE o.status = \'pending\') as pending_count,\n STRING_AGG(o.error_message, \'; \') FILTER (WHERE o.error_message IS NOT NULL) as error_message,\n CASE\n WHEN r.closed_at IS NULL AND COUNT(*) FILTER (WHERE o.status = \'running\') > 0 THEN \'running\'\n WHEN r.closed_at IS NULL AND (COUNT(o.*) = 0 OR COUNT(o.*) = COUNT(*) FILTER (WHERE o.status = \'pending\')) THEN \'pending\'\n WHEN r.closed_at IS NULL THEN \'running\'\n WHEN COUNT(*) FILTER (WHERE o.status = \'error\') > 0 THEN \'error\'\n ELSE \'complete\'\n END as status\nFROM {{sync_schema}}."_sync_runs" r\nLEFT JOIN {{sync_schema}}."_sync_obj_runs" o\n ON o._account_id = r._account_id\n AND o.run_started_at = r.started_at\nGROUP BY r._account_id, r.started_at, r.closed_at, r.triggered_by, r.max_concurrent;\n\nDROP FUNCTION IF EXISTS {{sync_schema}}."sync_obj_progress"(TEXT, TIMESTAMPTZ);\nCREATE OR REPLACE VIEW {{sync_schema}}."sync_obj_progress" AS\nSELECT\n r."_account_id" AS account_id,\n r.run_started_at,\n r.object,\n ROUND(\n 100.0 * COUNT(*) FILTER (WHERE r.status = \'complete\') / NULLIF(COUNT(*), 0),\n 1\n ) AS pct_complete,\n COALESCE(SUM(r.processed_count), 0) AS processed\nFROM {{sync_schema}}."_sync_obj_runs" r\nWHERE r.run_started_at = (\n SELECT MAX(s.started_at)\n FROM {{sync_schema}}."_sync_runs" s\n WHERE s."_account_id" = r."_account_id"\n)\nGROUP BY r."_account_id", r.run_started_at, r.object;\n', + }, + { + name: '0001_fix_set_updated_at.sql', + sql: 'CREATE OR REPLACE FUNCTION set_updated_at() RETURNS trigger\n LANGUAGE plpgsql\nAS $$\nBEGIN\n NEW._updated_at = now();\n RETURN NEW;\nEND;\n$$;\n', }, ] diff --git a/packages/sync-engine/src/database/migrations/0000_initial_migration.sql b/packages/sync-engine/src/database/migrations/0000_initial_migration.sql index bf1f707bf..57ab2d944 100644 --- a/packages/sync-engine/src/database/migrations/0000_initial_migration.sql +++ b/packages/sync-engine/src/database/migrations/0000_initial_migration.sql @@ -8,7 +8,15 @@ CREATE OR REPLACE FUNCTION set_updated_at() RETURNS trigger LANGUAGE plpgsql AS $$ BEGIN - NEW._updated_at = now(); + -- Support both legacy "updated_at" and newer "_updated_at" columns. + -- jsonb_populate_record silently ignores keys that are not present on NEW. + NEW := jsonb_populate_record( + NEW, + jsonb_build_object( + 'updated_at', now(), + '_updated_at', now() + ) + ); RETURN NEW; END; $$; diff --git a/packages/sync-engine/src/database/migrations/0001_fix_set_updated_at.sql b/packages/sync-engine/src/database/migrations/0001_fix_set_updated_at.sql new file mode 100644 index 000000000..5fdb6fecd --- /dev/null +++ b/packages/sync-engine/src/database/migrations/0001_fix_set_updated_at.sql @@ -0,0 +1,8 @@ +CREATE OR REPLACE FUNCTION set_updated_at() RETURNS trigger + LANGUAGE plpgsql +AS $$ +BEGIN + NEW._updated_at = now(); + RETURN NEW; +END; +$$; diff --git a/packages/sync-engine/src/tests/integration/postgres-sync-observability.test.ts b/packages/sync-engine/src/tests/integration/postgres-sync-observability.test.ts index 1338c8a1d..aae2fadc3 100644 --- a/packages/sync-engine/src/tests/integration/postgres-sync-observability.test.ts +++ b/packages/sync-engine/src/tests/integration/postgres-sync-observability.test.ts @@ -1,6 +1,6 @@ import { describe, it, expect, beforeAll, afterAll, beforeEach } from 'vitest' import { PostgresClient } from '../../database/postgres' -import { setupTestDatabase, type TestDatabase } from '../testSetup' +import { setupTestDatabase, sleep, type TestDatabase } from '../testSetup' describe('Observable Sync System Methods', () => { let postgresClient: PostgresClient @@ -127,6 +127,33 @@ describe('Observable Sync System Methods', () => { }) }) + describe('set_updated_at trigger', () => { + it('should advance _updated_at on accounts updates', async () => { + const beforeResult = await db.pool.query( + `SELECT "_updated_at" FROM stripe.accounts WHERE id = $1`, + [testAccountId] + ) + const beforeUpdatedAt = beforeResult.rows[0]._updated_at as Date + + await sleep(10) + + await db.pool.query( + `UPDATE stripe.accounts + SET "_raw_data" = jsonb_set("_raw_data", '{metadata}', '{"updated":true}'::jsonb, true) + WHERE id = $1`, + [testAccountId] + ) + + const afterResult = await db.pool.query( + `SELECT "_updated_at" FROM stripe.accounts WHERE id = $1`, + [testAccountId] + ) + const afterUpdatedAt = afterResult.rows[0]._updated_at as Date + + expect(afterUpdatedAt.getTime()).toBeGreaterThan(beforeUpdatedAt.getTime()) + }) + }) + describe('createObjectRuns', () => { it('should create object run entries for each object', async () => { const run = await postgresClient.getOrCreateSyncRun(testAccountId)