diff --git a/apps/rejot-cli/src/commands/manifest/manifest-sync.command.ts b/apps/rejot-cli/src/commands/manifest/manifest-sync.command.ts
index 40b34a11..818c2f3f 100644
--- a/apps/rejot-cli/src/commands/manifest/manifest-sync.command.ts
+++ b/apps/rejot-cli/src/commands/manifest/manifest-sync.command.ts
@@ -1,5 +1,3 @@
-import fs from "node:fs/promises";
-
import { z } from "zod";
import {
@@ -19,6 +17,7 @@ import { ConsoleLogger, getLogger, setLogger } from "@rejot-dev/contract/logger"
import { SyncManifestSchema } from "@rejot-dev/contract/manifest";
import type { ISubscribeMessageBus } from "@rejot-dev/contract/message-bus";
import { SyncManifest } from "@rejot-dev/contract/sync-manifest";
+import { ManifestWorkspaceResolver } from "@rejot-dev/contract-tools/manifest/manifest-workspace-resolver";
import { ExternalSyncMessageBus } from "@rejot-dev/sync/external-sync-message-bus";
import { SyncController } from "@rejot-dev/sync/sync-controller-new";
import { createResolver, type ISyncServiceResolver } from "@rejot-dev/sync/sync-http-resolver";
@@ -125,27 +124,41 @@ export class ManifestSyncCommand extends Command {
const manifestPaths = z.array(z.string()).parse(argv);
setLogger(new ConsoleLogger(logLevel.toUpperCase()));
+ const workspaceResolver = new ManifestWorkspaceResolver();
try {
// Read and parse manifest files
- const manifests = await Promise.all(
- manifestPaths.map(async (path) => {
- const content = await fs.readFile(path, "utf-8");
- const json = JSON.parse(content);
- return SyncManifestSchema.parse(json);
- }),
- );
+ const manifests = (
+ await Promise.all(
+ manifestPaths.flatMap(async (path) => {
+ const workspace = await workspaceResolver.resolveWorkspace({
+ startDir: process.cwd(),
+ filename: path,
+ });
+
+ if (!workspace) {
+ log.warn(`No workspace/manifests found in ${path}`);
+ return [];
+ }
+
+ const allManifests = [workspace.ancestor, ...workspace.children];
+ return allManifests;
+ }),
+ )
+ ).flat();
log.info(`Successfully loaded ${manifests.length} manifest(s)`);
- const syncManifest = new SyncManifest(manifests);
+ const syncManifest = new SyncManifest(manifests, {
+ checkPublicSchemaReferences: false,
+ });
// Create adapters
const { connectionAdapters, publicSchemaAdapters, consumerSchemaAdapters } =
this.#getAdapters();
// Create event store from the first manifest's event store config
- const eventStore = this.#createEventStore(manifests, connectionAdapters);
+ const eventStore = this.#createEventStore(syncManifest.manifests, connectionAdapters);
// There are four things we need to be doing:
// 1. Listen for changes on source data stores and write them to an event store.
diff --git a/apps/static-site/src/content/docs/1_guides/0_managing-manifests.mdx b/apps/static-site/src/content/docs/1_guides/0_managing-manifests.mdx
index 53b51639..75ecb011 100644
--- a/apps/static-site/src/content/docs/1_guides/0_managing-manifests.mdx
+++ b/apps/static-site/src/content/docs/1_guides/0_managing-manifests.mdx
@@ -139,7 +139,13 @@ Event Stores (Replication Targets):
A ReJot workspace is a collection of manifests that are managed together. It's useful for local
development use cases where you're typically only using a single event store and only want to run a
-single sync service.
+single sync service. It can also be used to combine for example secrets and other configuration
+across multiple manifests.
+
+
+ Data stores are shared across all manifests in a workspace. This means they have to have unique
+ names across all manifests in a workspace.
+
A manifest can link to other manifests using the `workspaces` field. This field should contain the
relative path to other manifest files.
diff --git a/apps/static-site/src/content/docs/1_guides/2_running-sync-services.mdx b/apps/static-site/src/content/docs/1_guides/2_running-sync-services.mdx
index 27ae4354..05f58c2c 100644
--- a/apps/static-site/src/content/docs/1_guides/2_running-sync-services.mdx
+++ b/apps/static-site/src/content/docs/1_guides/2_running-sync-services.mdx
@@ -44,7 +44,7 @@ environment resolver, where you supply a mapping of manifest to hostname through
variables:
```bash
-EXPORT REJOT_SYNC_SERVICE_slug-a=sync-service-a:3000
+export REJOT_SYNC_SERVICE_slug-a=sync-service-a:3000
rejot-cli manifest sync rejot-manifest.json \
--hostname 0.0.0.0 \
diff --git a/apps/static-site/src/content/docs/1_guides/4_composing-manifests.mdx b/apps/static-site/src/content/docs/1_guides/4_composing-manifests.mdx
index 681bf8e2..c710818b 100644
--- a/apps/static-site/src/content/docs/1_guides/4_composing-manifests.mdx
+++ b/apps/static-site/src/content/docs/1_guides/4_composing-manifests.mdx
@@ -35,7 +35,6 @@ base manifest won't specify any connections:
// rejot-manifest.base.json
{
"connections": [],
- "dataStores": ...,
"publicSchemas": ...,
}
// rejot-manifest.dev.json
@@ -46,7 +45,13 @@ base manifest won't specify any connections:
"host": "localhost",
...
}
- }]
+ }],
+ "dataStores": [
+ {
+ "connectionSlug": "conn-datastore-a",
+ ...
+ }
+ ]
}
// rejot-manifest.prod.json
@@ -57,7 +62,13 @@ base manifest won't specify any connections:
"host": "my-datastore-a.europe-west4.domain.com",
...
}
- }]
+ }],
+ "dataStores": [
+ {
+ "connectionSlug": "conn-datastore-a",
+ ...
+ }
+ ]
}
```
@@ -74,4 +85,20 @@ rejot-cli manifest sync rejot-manifest.base.json rejot-manifest.prod.json
## Workspaces
Use the [`workspaces`](/docs/spec/manifest) key in your manifest to include other Manifest files
-automatically.
+automatically. This can simplify the management of multiple manifests, for example when you have
+multiple services that need to share the same connection configuration.
+
+For example, workspace file `rejot-workspace.json` might look like this:
+
+```json
+{
+ "workspaces": ["./service-one/rejot-manifest.json", "./service-two/rejot-manifest.json"]
+}
+```
+
+And call the sync service with multiple manifests:
+
+```bash
+# Development
+rejot-cli manifest sync rejot-workspace.json
+```
diff --git a/packages/adapter-postgres/src/postgres-replication-listener.ts b/packages/adapter-postgres/src/postgres-replication-listener.ts
index ba304a1e..b0b9c74d 100644
--- a/packages/adapter-postgres/src/postgres-replication-listener.ts
+++ b/packages/adapter-postgres/src/postgres-replication-listener.ts
@@ -117,7 +117,11 @@ export class PostgresReplicationListener {
state: "empty",
};
- log.info("PostgresReplicationListener initialized", { database: config.database });
+ log.info("PostgresReplicationListener initialized", {
+ database: config.database,
+ host: config.host,
+ port: config.port,
+ });
}
async start(publicationName: string, slotName: string): Promise {
diff --git a/packages/adapter-postgres/src/util/postgres-client.ts b/packages/adapter-postgres/src/util/postgres-client.ts
index dd683758..d8b92e7c 100644
--- a/packages/adapter-postgres/src/util/postgres-client.ts
+++ b/packages/adapter-postgres/src/util/postgres-client.ts
@@ -1,5 +1,9 @@
-import type { ClientBase, QueryResult, QueryResultRow } from "pg";
-import { DatabaseError, Pool } from "pg";
+import type { ClientBase, PoolClient, QueryResult, QueryResultRow } from "pg";
+import pg from "pg";
+
+import { getLogger } from "@rejot-dev/contract/logger";
+
+const { DatabaseError, Pool } = pg;
export interface PostgresConfig {
host: string;
@@ -9,6 +13,8 @@ export interface PostgresConfig {
database: string;
}
+const log = getLogger(import.meta.url);
+
/**
* Parses a Postgres connection string into a PostgresConfig object.
* Supports both 'postgres://' and 'postgresql://' protocols.
@@ -50,7 +56,7 @@ export function parsePostgresConnectionString(connectionString: string): Postgre
type PoolOrClient =
| {
type: "pool";
- pool: Pool;
+ pool: pg.Pool;
}
| {
type: "client";
@@ -140,7 +146,16 @@ export class PostgresClient implements IPostgresClient {
const poolOrClient = this.#poolOrClient;
if (poolOrClient.type === "pool") {
- const client = await poolOrClient.pool.connect();
+ let client: PoolClient;
+ try {
+ client = await poolOrClient.pool.connect();
+ } catch (error) {
+ log.error(
+ `Failed to connect to Postgres, connection string: ${this.#config.host}:${this.#config.port}/${this.#config.database}`,
+ );
+ throw error;
+ }
+
try {
return await this.#query(client, queryText, values);
} finally {
diff --git a/packages/contract-tools/manifest/manifest-workspace-resolver.ts b/packages/contract-tools/manifest/manifest-workspace-resolver.ts
index 5aaeebee..4d6d1376 100644
--- a/packages/contract-tools/manifest/manifest-workspace-resolver.ts
+++ b/packages/contract-tools/manifest/manifest-workspace-resolver.ts
@@ -137,10 +137,7 @@ export function workspaceToSyncManifest(
workspace: WorkspaceDefinition,
options: SyncManifestOptions = {},
): SyncManifest {
- return new SyncManifest(
- [workspace.ancestor.manifest, ...workspace.children.map((child) => child.manifest)],
- options,
- );
+ return new SyncManifest([workspace.ancestor, ...workspace.children], options);
}
export function getManifestBySlug(
diff --git a/packages/contract/manifest/manifest-helpers.ts b/packages/contract/manifest/manifest-helpers.ts
index 9bf4ccb3..9ec6c45d 100644
--- a/packages/contract/manifest/manifest-helpers.ts
+++ b/packages/contract/manifest/manifest-helpers.ts
@@ -150,11 +150,7 @@ export function getPublicSchemasForDataStore(
return manifests.flatMap((manifest) =>
(manifest.publicSchemas ?? [])
.filter((schema) => {
- const dataStore = (manifest.dataStores ?? []).find(
- (ds) => ds.connectionSlug === sourceDataStoreSlug,
- );
- // Ensure dataStore exists and matches the source slug, and the operation table is included
- return dataStore && schema.source.dataStoreSlug === dataStore.connectionSlug;
+ return schema.source.dataStoreSlug === sourceDataStoreSlug;
})
.map(({ name, source, config, version, outputSchema }) => ({
name,
diff --git a/packages/contract/manifest/sync-manifest.test.ts b/packages/contract/manifest/sync-manifest.test.ts
index 15f57590..94b4fce8 100644
--- a/packages/contract/manifest/sync-manifest.test.ts
+++ b/packages/contract/manifest/sync-manifest.test.ts
@@ -108,7 +108,16 @@ describe("SyncManifest", () => {
} satisfies Manifest;
// Initialize SyncManifest with both manifests
- const syncManifest = new SyncManifest([manifestAWithPublicSchema, manifestBWithConsumer]);
+ const syncManifest = new SyncManifest([
+ {
+ path: "test-manifest.json",
+ manifest: manifestAWithPublicSchema,
+ },
+ {
+ path: "test-manifest2.json",
+ manifest: manifestBWithConsumer,
+ },
+ ]);
// There should be no external schemas since all references are resolved internally
const externalSchemas = syncManifest.getExternalConsumerSchemas();
@@ -134,9 +143,17 @@ describe("SyncManifest", () => {
);
// Initialize SyncManifest with just our local manifest
- const syncManifest = new SyncManifest([manifestWithExternalReferences], {
- checkPublicSchemaReferences: false,
- });
+ const syncManifest = new SyncManifest(
+ [
+ {
+ path: "test-manifest.json",
+ manifest: manifestWithExternalReferences,
+ },
+ ],
+ {
+ checkPublicSchemaReferences: false,
+ },
+ );
// Get external consumer schemas
const externalSchemas = syncManifest.getExternalConsumerSchemas();
@@ -176,9 +193,17 @@ describe("SyncManifest", () => {
);
// Initialize SyncManifest with our local manifest
- const syncManifest = new SyncManifest([manifestWithExternalReferences], {
- checkPublicSchemaReferences: false,
- });
+ const syncManifest = new SyncManifest(
+ [
+ {
+ path: "test-manifest.json",
+ manifest: manifestWithExternalReferences,
+ },
+ ],
+ {
+ checkPublicSchemaReferences: false,
+ },
+ );
// Get external consumer schemas
const externalSchemas = syncManifest.getExternalConsumerSchemas();
diff --git a/packages/contract/manifest/sync-manifest.ts b/packages/contract/manifest/sync-manifest.ts
index e7cc5552..89beaca3 100644
--- a/packages/contract/manifest/sync-manifest.ts
+++ b/packages/contract/manifest/sync-manifest.ts
@@ -2,6 +2,7 @@ import { z } from "zod";
import type { TransformedOperationWithSource } from "../event-store/event-store.ts";
import { getLogger } from "../logger/logger.ts";
+import type { ManifestWithPath } from "../workspace/workspace.ts";
import {
type ConnectionConfigSchema,
ConsumerSchemaSchema,
@@ -24,7 +25,7 @@ import {
import {
type ExternalPublicSchemaReference,
type VerificationResult,
- verifyManifests,
+ verifyManifestsWithPaths,
} from "./verify-manifest.ts";
const log = getLogger(import.meta.url);
@@ -61,11 +62,11 @@ export interface SyncManifestOptions {
}
export class SyncManifest {
- readonly #manifests: Manifest[];
+ readonly #manifests: ManifestWithPath[];
readonly #externalSchemaReferences: ExternalPublicSchemaReference[];
- constructor(manifests: Manifest[], options: SyncManifestOptions = {}) {
- const verificationResult: VerificationResult = verifyManifests(
+ constructor(manifests: ManifestWithPath[], options: SyncManifestOptions = {}) {
+ const verificationResult: VerificationResult = verifyManifestsWithPaths(
manifests,
options.checkPublicSchemaReferences,
);
@@ -109,31 +110,31 @@ export class SyncManifest {
}
get manifests(): Manifest[] {
- return this.#manifests;
+ return this.#manifests.map((m) => m.manifest);
}
get connections(): Connection[] {
- return getConnectionsHelper(this.#manifests);
+ return getConnectionsHelper(this.manifests);
}
get dataStores(): NonNullable {
- return getDataStoresHelper(this.#manifests);
+ return getDataStoresHelper(this.manifests);
}
get eventStores(): NonNullable {
- return getEventStoresHelper(this.#manifests);
+ return getEventStoresHelper(this.manifests);
}
getSourceDataStores(): SourceDataStore[] {
- return getSourceDataStoresHelper(this.#manifests);
+ return getSourceDataStoresHelper(this.manifests);
}
getDestinationDataStores(): DestinationDataStore[] {
- return getDestinationDataStoresHelper(this.#manifests);
+ return getDestinationDataStoresHelper(this.manifests);
}
getConnectionBySlug(connectionSlug: string): Connection | undefined {
- return getConnectionBySlugHelper(this.#manifests, connectionSlug);
+ return getConnectionBySlugHelper(this.manifests, connectionSlug);
}
get hasUnresolvedExternalReferences(): boolean {
@@ -148,23 +149,23 @@ export class SyncManifest {
* @throws Error if the internal state is inconsistent (e.g., referenced manifest or schema not found).
*/
getExternalConsumerSchemas(): Record[]> {
- return getExternalConsumerSchemasHelper(this.#manifests);
+ return getExternalConsumerSchemasHelper(this.manifests);
}
getConsumerSchemasForPublicSchema(
operation: TransformedOperationWithSource,
): z.infer[] {
- return getConsumerSchemasForPublicSchemaHelper(this.#manifests, operation);
+ return getConsumerSchemasForPublicSchemaHelper(this.manifests, operation);
}
getPublicSchemasForOperation(
dataStoreSlug: string,
): (z.infer & { sourceManifestSlug: string })[] {
- return getPublicSchemasForDataStore(this.#manifests, dataStoreSlug);
+ return getPublicSchemasForDataStore(this.manifests, dataStoreSlug);
}
getPublicSchemas(): (z.infer & { manifestSlug: string })[] {
- return getPublicSchemasHelper(this.#manifests);
+ return getPublicSchemasHelper(this.manifests);
}
/**
diff --git a/packages/contract/manifest/verify-manifest-multi.test.ts b/packages/contract/manifest/verify-manifest-multi.test.ts
index 1cf77418..330593e3 100644
--- a/packages/contract/manifest/verify-manifest-multi.test.ts
+++ b/packages/contract/manifest/verify-manifest-multi.test.ts
@@ -332,7 +332,7 @@ describe("verifyManifests - multiple", () => {
...createBasicManifest("manifest2"),
connections: [
{
- slug: "ds1",
+ slug: "ds2",
config: {
connectionType: "postgres",
host: "localhost",
@@ -345,7 +345,7 @@ describe("verifyManifests - multiple", () => {
],
dataStores: [
{
- connectionSlug: "ds1",
+ connectionSlug: "ds2",
config: {
connectionType: "postgres",
slotName: "pub1",
@@ -378,6 +378,257 @@ describe("verifyManifests - multiple", () => {
expect(result.diagnostics[0].location.manifestSlug).toBe("test-manifest");
});
+ test("data store in another manifest but connection is missing", () => {
+ const manifest1: Manifest = {
+ ...createBasicManifest("fromaccounts"),
+ publicSchemas: [
+ {
+ name: "accounts-schema",
+ source: {
+ dataStoreSlug: "accounts",
+ },
+ outputSchema: {
+ type: "object",
+ properties: {
+ id: { type: "number" },
+ name: { type: "string" },
+ email: { type: "string" },
+ },
+ required: ["id", "name", "email"],
+ additionalProperties: false,
+ $schema: "http://json-schema.org/draft-07/schema#",
+ },
+ version: { major: 1, minor: 0 },
+ config: {
+ publicSchemaType: "postgres",
+ transformations: [],
+ },
+ definitionFile: "packages/sync-models/src/account-schema.ts",
+ },
+ ],
+ };
+
+ const manifest2: Manifest = {
+ ...createBasicManifest("from-accounts-docker"),
+ dataStores: [
+ {
+ connectionSlug: "accounts",
+ config: {
+ connectionType: "postgres",
+ slotName: "rejot_slot",
+ publicationName: "rejot_publication",
+ },
+ },
+ ],
+ // No connections defined!
+ };
+
+ const result = verifyManifests([manifest1, manifest2]);
+ expect(result.isValid).toBe(false);
+ const connectionError = result.diagnostics.find(
+ (e) => e.type === "CONNECTION_NOT_FOUND" && e.message.includes("accounts"),
+ );
+ expect(connectionError).toBeDefined();
+ });
+
+ test("public schema references non-existent data store across manifests", () => {
+ const manifest1: Manifest = {
+ ...createBasicManifest("fromaccounts"),
+ publicSchemas: [
+ {
+ name: "accounts-schema",
+ source: {
+ dataStoreSlug: "nonexistent",
+ },
+ outputSchema: {
+ type: "object",
+ properties: {
+ id: { type: "number" },
+ name: { type: "string" },
+ email: { type: "string" },
+ },
+ required: ["id", "name", "email"],
+ additionalProperties: false,
+ $schema: "http://json-schema.org/draft-07/schema#",
+ },
+ version: { major: 1, minor: 0 },
+ config: {
+ publicSchemaType: "postgres",
+ transformations: [],
+ },
+ definitionFile: "packages/sync-models/src/account-schema.ts",
+ },
+ ],
+ };
+
+ const manifest2: Manifest = {
+ ...createBasicManifest("from-accounts-docker"),
+ // No dataStores with slug "nonexistent"
+ };
+
+ const result = verifyManifests([manifest1, manifest2]);
+ expect(result.isValid).toBe(false);
+ const dataStoreError = result.diagnostics.find(
+ (e) => e.type === "DATA_STORE_NOT_FOUND" && e.message.includes("nonexistent"),
+ );
+ expect(dataStoreError).toBeDefined();
+ });
+
+ test("public schema can reference data store in a different manifest", () => {
+ // Manifest 1: defines the public schema, references a data store not defined here
+ const manifest1: Manifest = {
+ ...createBasicManifest("fromaccounts"),
+ publicSchemas: [
+ {
+ name: "accounts-schema",
+ source: {
+ dataStoreSlug: "accounts",
+ },
+ outputSchema: {
+ type: "object",
+ properties: {
+ id: { type: "number" },
+ name: { type: "string" },
+ email: { type: "string" },
+ },
+ required: ["id", "name", "email"],
+ additionalProperties: false,
+ $schema: "http://json-schema.org/draft-07/schema#",
+ },
+ version: { major: 1, minor: 0 },
+ config: {
+ publicSchemaType: "postgres",
+ transformations: [
+ {
+ operation: "insert",
+ table: "accounts",
+ sql: 'SELECT a.id, a.name as "name", a.email as "email" FROM accounts a WHERE a.id = :id',
+ },
+ {
+ operation: "update",
+ table: "accounts",
+ sql: 'SELECT a.id, a.name as "name", a.email as "email" FROM accounts a WHERE a.id = :id',
+ },
+ ],
+ },
+ definitionFile: "packages/sync-models/src/account-schema.ts",
+ },
+ ],
+ };
+
+ // Manifest 2: defines the data store
+ const manifest2: Manifest = {
+ ...createBasicManifest("from-accounts-docker"),
+ connections: [
+ {
+ slug: "accounts",
+ config: {
+ connectionType: "postgres",
+ host: "db-accounts",
+ port: 5432,
+ user: "postgres",
+ password: "postgres",
+ database: "postgres",
+ },
+ },
+ {
+ slug: "eventstore",
+ config: {
+ connectionType: "postgres",
+ host: "db-eventstore",
+ port: 5432,
+ user: "postgres",
+ password: "postgres",
+ database: "postgres",
+ },
+ },
+ ],
+ dataStores: [
+ {
+ connectionSlug: "accounts",
+ config: {
+ connectionType: "postgres",
+ slotName: "rejot_slot",
+ publicationName: "rejot_publication",
+ },
+ },
+ ],
+ eventStores: [
+ {
+ connectionSlug: "eventstore",
+ },
+ ],
+ };
+
+ const result = verifyManifests([manifest1, manifest2]);
+ expect(result.isValid).toBe(true);
+ expect(result.diagnostics).toHaveLength(0);
+ });
+
+ test("multiple data stores with the same slug", () => {
+ const manifest1: Manifest = {
+ ...createBasicManifest("manifest1"),
+ connections: [
+ {
+ slug: "conn1",
+ config: {
+ connectionType: "postgres",
+ host: "localhost",
+ port: 5432,
+ database: "test",
+ user: "user",
+ password: "pass",
+ },
+ },
+ ],
+ dataStores: [
+ {
+ connectionSlug: "conn1",
+ config: {
+ connectionType: "postgres",
+ slotName: "pub1",
+ publicationName: "pub1",
+ },
+ },
+ ],
+ };
+
+ const manifest2: Manifest = {
+ ...createBasicManifest("manifest2"),
+ connections: [
+ {
+ slug: "conn1",
+ config: {
+ connectionType: "postgres",
+ host: "localhost",
+ port: 5432,
+ database: "test",
+ user: "user",
+ password: "pass",
+ },
+ },
+ ],
+ dataStores: [
+ {
+ connectionSlug: "conn1",
+ config: {
+ connectionType: "postgres",
+ slotName: "pub1",
+ publicationName: "pub1",
+ },
+ },
+ ],
+ };
+
+ const result = verifyManifests([manifest1, manifest2]);
+ expect(result.isValid).toBe(false);
+ expect(result.diagnostics).toHaveLength(1);
+ expect(result.diagnostics[0].type).toBe("DUPLICATE_DATA_STORE");
+ expect(result.diagnostics[0].severity).toBe("error");
+ expect(result.diagnostics[0].message).toContain("conn1");
+ expect(result.diagnostics[0].location.manifestSlug).toBe("manifest2");
+ });
+
test("handles both errors and external references", () => {
const manifest1: Manifest = {
...createBasicManifest("manifest1"),
diff --git a/packages/contract/manifest/verify-manifest.ts b/packages/contract/manifest/verify-manifest.ts
index b22c025f..46d47716 100644
--- a/packages/contract/manifest/verify-manifest.ts
+++ b/packages/contract/manifest/verify-manifest.ts
@@ -7,6 +7,7 @@ export type ManifestDiagnosticSeverity = "error" | "warning";
export type ManifestDiagnostic = {
type:
+ | "DUPLICATE_DATA_STORE"
| "CONNECTION_NOT_FOUND"
| "CONNECTION_TYPE_MISMATCH"
| "DATA_STORE_MISSING_CONFIG"
@@ -222,10 +223,23 @@ export function verifyPublicSchemaReferences(
});
// Build a map of all data stores
- const dataStoreMap = new Map>();
+ const dataStoreMap = new Set();
manifests.forEach(({ manifest }) => {
- const storeNames = new Set(manifest.dataStores?.map((ds) => ds.connectionSlug) ?? []);
- dataStoreMap.set(manifest.slug, storeNames);
+ manifest.dataStores?.forEach((ds) => {
+ if (dataStoreMap.has(ds.connectionSlug)) {
+ errors.push({
+ type: "DUPLICATE_DATA_STORE",
+ severity: "error",
+ message: `Data store '${ds.connectionSlug}' is defined multiple times`,
+ location: {
+ manifestSlug: manifest.slug,
+ context: `dataStore.connectionSlug: ${ds.connectionSlug}`,
+ },
+ });
+ } else {
+ dataStoreMap.add(ds.connectionSlug);
+ }
+ });
});
// Build a map of data stores with configs
@@ -243,8 +257,7 @@ export function verifyPublicSchemaReferences(
// Verify public schemas reference valid data stores
manifests.forEach(({ manifest, path: manifestPath }) => {
manifest.publicSchemas?.forEach((publicSchema, index) => {
- const manifestDataStores = dataStoreMap.get(manifest.slug);
- if (manifestDataStores && !manifestDataStores.has(publicSchema.source.dataStoreSlug)) {
+ if (!dataStoreMap.has(publicSchema.source.dataStoreSlug)) {
errors.push({
type: "DATA_STORE_NOT_FOUND",
severity: "error",
@@ -351,17 +364,7 @@ export function verifyPublicSchemaReferences(
// Check if referenced data store exists in ANY manifest
// The destination data store could be in any manifest, not just the consumer's or source's
- let dataStoreFound = false;
-
- // Search across all manifests for the data store
- for (const [_manifestSlug, stores] of dataStoreMap.entries()) {
- if (stores.has(consumerSchema.config.destinationDataStoreSlug)) {
- dataStoreFound = true;
- break;
- }
- }
-
- if (!dataStoreFound) {
+ if (!dataStoreMap.has(consumerSchema.config.destinationDataStoreSlug)) {
errors.push({
type: "DATA_STORE_NOT_FOUND",
severity: "error",
diff --git a/packages/sync/src/sync-controller/external-sync-message-bus.test.ts b/packages/sync/src/sync-controller/external-sync-message-bus.test.ts
index 98baae4b..5ea65094 100644
--- a/packages/sync/src/sync-controller/external-sync-message-bus.test.ts
+++ b/packages/sync/src/sync-controller/external-sync-message-bus.test.ts
@@ -59,27 +59,30 @@ describe("ExternalSyncMessageBus", () => {
const manifest = new SyncManifest(
[
{
- slug: "local",
- manifestVersion: 1,
- connections: [],
- dataStores: [],
- eventStores: [],
- publicSchemas: [],
- consumerSchemas: [
- {
- name: "test-consumer",
- sourceManifestSlug: "test",
- publicSchema: {
- name: "test",
- majorVersion: 1,
- },
- config: {
- consumerSchemaType: "postgres",
- destinationDataStoreSlug: "test-store",
- sql: "SELECT * FROM test",
+ path: "test-manifest.json",
+ manifest: {
+ slug: "local",
+ manifestVersion: 1,
+ connections: [],
+ dataStores: [],
+ eventStores: [],
+ publicSchemas: [],
+ consumerSchemas: [
+ {
+ name: "test-consumer",
+ sourceManifestSlug: "test",
+ publicSchema: {
+ name: "test",
+ majorVersion: 1,
+ },
+ config: {
+ consumerSchemaType: "postgres",
+ destinationDataStoreSlug: "test-store",
+ sql: "SELECT * FROM test",
+ },
},
- },
- ],
+ ],
+ },
},
],
{ checkPublicSchemaReferences: false },
@@ -145,27 +148,30 @@ describe("ExternalSyncMessageBus", () => {
const manifest = new SyncManifest(
[
{
- slug: "local",
- manifestVersion: 1,
- connections: [],
- dataStores: [],
- eventStores: [],
- publicSchemas: [],
- consumerSchemas: [
- {
- name: "test-consumer",
- sourceManifestSlug: "test",
- publicSchema: {
- name: "test",
- majorVersion: 1,
- },
- config: {
- consumerSchemaType: "postgres",
- destinationDataStoreSlug: "test-store",
- sql: "SELECT * FROM test",
+ path: "test-manifest.json",
+ manifest: {
+ slug: "local",
+ manifestVersion: 1,
+ connections: [],
+ dataStores: [],
+ eventStores: [],
+ publicSchemas: [],
+ consumerSchemas: [
+ {
+ name: "test-consumer",
+ sourceManifestSlug: "test",
+ publicSchema: {
+ name: "test",
+ majorVersion: 1,
+ },
+ config: {
+ consumerSchemaType: "postgres",
+ destinationDataStoreSlug: "test-store",
+ sql: "SELECT * FROM test",
+ },
},
- },
- ],
+ ],
+ },
},
],
{ checkPublicSchemaReferences: false },
@@ -214,27 +220,30 @@ describe("ExternalSyncMessageBus", () => {
const manifest = new SyncManifest(
[
{
- slug: "local",
- manifestVersion: 1,
- connections: [],
- dataStores: [],
- eventStores: [],
- publicSchemas: [],
- consumerSchemas: [
- {
- name: "test-consumer",
- sourceManifestSlug: "test",
- publicSchema: {
- name: "test",
- majorVersion: 1,
- },
- config: {
- consumerSchemaType: "postgres",
- destinationDataStoreSlug: "test-store",
- sql: "SELECT * FROM test",
+ path: "test-manifest.json",
+ manifest: {
+ slug: "local",
+ manifestVersion: 1,
+ connections: [],
+ dataStores: [],
+ eventStores: [],
+ publicSchemas: [],
+ consumerSchemas: [
+ {
+ name: "test-consumer",
+ sourceManifestSlug: "test",
+ publicSchema: {
+ name: "test",
+ majorVersion: 1,
+ },
+ config: {
+ consumerSchemaType: "postgres",
+ destinationDataStoreSlug: "test-store",
+ sql: "SELECT * FROM test",
+ },
},
- },
- ],
+ ],
+ },
},
],
{ checkPublicSchemaReferences: false },
@@ -269,27 +278,30 @@ describe("ExternalSyncMessageBus", () => {
const manifest = new SyncManifest(
[
{
- slug: "local",
- manifestVersion: 1,
- connections: [],
- dataStores: [],
- eventStores: [],
- publicSchemas: [],
- consumerSchemas: [
- {
- name: "test-consumer",
- sourceManifestSlug: "test",
- publicSchema: {
- name: "test",
- majorVersion: 1,
- },
- config: {
- consumerSchemaType: "postgres",
- destinationDataStoreSlug: "test-store",
- sql: "SELECT * FROM test",
+ path: "test-manifest.json",
+ manifest: {
+ slug: "local",
+ manifestVersion: 1,
+ connections: [],
+ dataStores: [],
+ eventStores: [],
+ publicSchemas: [],
+ consumerSchemas: [
+ {
+ name: "test-consumer",
+ sourceManifestSlug: "test",
+ publicSchema: {
+ name: "test",
+ majorVersion: 1,
+ },
+ config: {
+ consumerSchemaType: "postgres",
+ destinationDataStoreSlug: "test-store",
+ sql: "SELECT * FROM test",
+ },
},
- },
- ],
+ ],
+ },
},
],
{ checkPublicSchemaReferences: false },
diff --git a/packages/sync/src/sync-controller/public-schema-transformer.test.ts b/packages/sync/src/sync-controller/public-schema-transformer.test.ts
index e8d583c1..19ce4d04 100644
--- a/packages/sync/src/sync-controller/public-schema-transformer.test.ts
+++ b/packages/sync/src/sync-controller/public-schema-transformer.test.ts
@@ -49,68 +49,71 @@ describe("PublicSchemaTransformer", () => {
const createTestManifest = () =>
new SyncManifest([
{
- slug: "test-manifest",
- manifestVersion: 1,
- connections: [
- {
- slug: "test-connection",
- config: {
- connectionType: "postgres" as const,
- host: "localhost",
- port: 5432,
- database: "test",
- user: "test",
- password: "test",
- },
- },
- ],
- dataStores: [
- {
- connectionSlug: "test-connection",
- config: {
- connectionType: "postgres" as const,
- publicationName: "test-publication",
- slotName: "test-slot",
- },
- },
- ],
- eventStores: [],
- publicSchemas: [
- {
- name: "test-schema",
- source: {
- dataStoreSlug: "test-connection",
- },
- outputSchema: {
- type: "object",
- properties: {
- id: { type: "number" },
- name: { type: "string" },
+ path: "test-manifest.json",
+ manifest: {
+ slug: "test-manifest",
+ manifestVersion: 1,
+ connections: [
+ {
+ slug: "test-connection",
+ config: {
+ connectionType: "postgres" as const,
+ host: "localhost",
+ port: 5432,
+ database: "test",
+ user: "test",
+ password: "test",
},
- required: ["id", "name"],
},
- version: {
- major: 1,
- minor: 0,
+ ],
+ dataStores: [
+ {
+ connectionSlug: "test-connection",
+ config: {
+ connectionType: "postgres" as const,
+ publicationName: "test-publication",
+ slotName: "test-slot",
+ },
},
- config: {
- publicSchemaType: "postgres",
- transformations: [
- {
- operation: "insert",
- table: "test_table",
- sql: "SELECT * FROM test_table WHERE id = $1",
- },
- {
- operation: "delete",
- table: "test_table",
- sql: "DELETE FROM test_table WHERE id = $1",
+ ],
+ eventStores: [],
+ publicSchemas: [
+ {
+ name: "test-schema",
+ source: {
+ dataStoreSlug: "test-connection",
+ },
+ outputSchema: {
+ type: "object",
+ properties: {
+ id: { type: "number" },
+ name: { type: "string" },
},
- ],
+ required: ["id", "name"],
+ },
+ version: {
+ major: 1,
+ minor: 0,
+ },
+ config: {
+ publicSchemaType: "postgres",
+ transformations: [
+ {
+ operation: "insert",
+ table: "test_table",
+ sql: "SELECT * FROM test_table WHERE id = $1",
+ },
+ {
+ operation: "delete",
+ table: "test_table",
+ sql: "DELETE FROM test_table WHERE id = $1",
+ },
+ ],
+ },
},
- },
- ],
- consumerSchemas: [],
+ ],
+ consumerSchemas: [],
+ },
},
]);
diff --git a/packages/sync/src/sync-controller/sink-writer.test.ts b/packages/sync/src/sync-controller/sink-writer.test.ts
index 7bf4d48e..ef237536 100644
--- a/packages/sync/src/sync-controller/sink-writer.test.ts
+++ b/packages/sync/src/sync-controller/sink-writer.test.ts
@@ -100,77 +100,80 @@ describe("SinkWriter", () => {
const createTestManifest = () =>
new SyncManifest([
{
- slug: "test-manifest",
- manifestVersion: 1,
- connections: [
- {
- slug: "test-connection",
- config: {
- connectionType: "postgres" as const,
- host: "localhost",
- port: 5432,
- database: "test",
- user: "test",
- password: "test",
- },
- },
- ],
- dataStores: [
- {
- connectionSlug: "test-connection",
- config: {
- connectionType: "postgres" as const,
- publicationName: "test-publication",
- slotName: "test-slot",
- },
- },
- ],
- eventStores: [],
- publicSchemas: [
- {
- name: "test-schema",
- source: {
- dataStoreSlug: "test-connection",
- },
- outputSchema: {
- type: "object",
- properties: {
- id: { type: "number" },
- name: { type: "string" },
+ path: "test-manifest.json",
+ manifest: {
+ slug: "test-manifest",
+ manifestVersion: 1,
+ connections: [
+ {
+ slug: "test-connection",
+ config: {
+ connectionType: "postgres" as const,
+ host: "localhost",
+ port: 5432,
+ database: "test",
+ user: "test",
+ password: "test",
},
- required: ["id", "name"],
},
- config: {
- publicSchemaType: "postgres" as const,
- transformations: [
- {
- operation: "insert" as const,
- table: "test_table",
- sql: "SELECT * FROM test_table WHERE id = $1",
- },
- ],
- },
- version: {
- major: 1,
- minor: 0,
+ ],
+ dataStores: [
+ {
+ connectionSlug: "test-connection",
+ config: {
+ connectionType: "postgres" as const,
+ publicationName: "test-publication",
+ slotName: "test-slot",
+ },
},
- },
- ],
- consumerSchemas: [
- {
- name: "test-consumer-schema",
- sourceManifestSlug: "test-manifest",
- publicSchema: {
+ ],
+ eventStores: [],
+ publicSchemas: [
+ {
name: "test-schema",
- majorVersion: 1,
+ source: {
+ dataStoreSlug: "test-connection",
+ },
+ outputSchema: {
+ type: "object",
+ properties: {
+ id: { type: "number" },
+ name: { type: "string" },
+ },
+ required: ["id", "name"],
+ },
+ config: {
+ publicSchemaType: "postgres" as const,
+ transformations: [
+ {
+ operation: "insert" as const,
+ table: "test_table",
+ sql: "SELECT * FROM test_table WHERE id = $1",
+ },
+ ],
+ },
+ version: {
+ major: 1,
+ minor: 0,
+ },
},
- config: {
- consumerSchemaType: "postgres" as const,
- destinationDataStoreSlug: "test-connection",
- sql: "INSERT INTO test_table (id, name) VALUES ($1, $2)",
+ ],
+ consumerSchemas: [
+ {
+ name: "test-consumer-schema",
+ sourceManifestSlug: "test-manifest",
+ publicSchema: {
+ name: "test-schema",
+ majorVersion: 1,
+ },
+ config: {
+ consumerSchemaType: "postgres" as const,
+ destinationDataStoreSlug: "test-connection",
+ sql: "INSERT INTO test_table (id, name) VALUES ($1, $2)",
+ },
},
- },
- ],
+ ],
+ },
},
]);
diff --git a/packages/sync/src/sync-controller/source-reader.test.ts b/packages/sync/src/sync-controller/source-reader.test.ts
index ea40c4c3..42bb70a2 100644
--- a/packages/sync/src/sync-controller/source-reader.test.ts
+++ b/packages/sync/src/sync-controller/source-reader.test.ts
@@ -10,27 +10,30 @@ describe("SourceReader", () => {
const createTestManifest = () =>
new SyncManifest([
{
- slug: "test-manifest",
- manifestVersion: 1,
- connections: [
- {
- slug: "test-connection",
- config: {
- connectionType: "in-memory" as const,
+ path: "test-manifest.json",
+ manifest: {
+ slug: "test-manifest",
+ manifestVersion: 1,
+ connections: [
+ {
+ slug: "test-connection",
+ config: {
+ connectionType: "in-memory" as const,
+ },
},
- },
- ],
- dataStores: [
- {
- connectionSlug: "test-connection",
- config: {
- connectionType: "in-memory" as const,
+ ],
+ dataStores: [
+ {
+ connectionSlug: "test-connection",
+ config: {
+ connectionType: "in-memory" as const,
+ },
},
- },
- ],
- eventStores: [],
- publicSchemas: [],
- consumerSchemas: [],
+ ],
+ eventStores: [],
+ publicSchemas: [],
+ consumerSchemas: [],
+ },
},
]);
@@ -43,34 +46,37 @@ describe("SourceReader", () => {
test("should throw error if no adapter found for connection type", () => {
const manifest = new SyncManifest([
{
- slug: "test-manifest",
- manifestVersion: 1,
- connections: [
- {
- slug: "test-connection",
- config: {
- connectionType: "postgres" as const,
- host: "localhost",
- port: 5432,
- user: "postgres",
- password: "postgres",
- database: "postgres",
+ path: "test-manifest.json",
+ manifest: {
+ slug: "test-manifest",
+ manifestVersion: 1,
+ connections: [
+ {
+ slug: "test-connection",
+ config: {
+ connectionType: "postgres" as const,
+ host: "localhost",
+ port: 5432,
+ user: "postgres",
+ password: "postgres",
+ database: "postgres",
+ },
},
- },
- ],
- dataStores: [
- {
- connectionSlug: "test-connection",
- config: {
- connectionType: "postgres" as const,
- publicationName: "test-publication",
- slotName: "test-slot",
+ ],
+ dataStores: [
+ {
+ connectionSlug: "test-connection",
+ config: {
+ connectionType: "postgres" as const,
+ publicationName: "test-publication",
+ slotName: "test-slot",
+ },
},
- },
- ],
- eventStores: [],
- publicSchemas: [],
- consumerSchemas: [],
+ ],
+ eventStores: [],
+ publicSchemas: [],
+ consumerSchemas: [],
+ },
},
]);
diff --git a/packages/sync/src/sync-controller/sync-controller.test.ts b/packages/sync/src/sync-controller/sync-controller.test.ts
index 9dec8676..fe8f5e02 100644
--- a/packages/sync/src/sync-controller/sync-controller.test.ts
+++ b/packages/sync/src/sync-controller/sync-controller.test.ts
@@ -11,27 +11,30 @@ describe("SyncController", () => {
const createTestManifest = () =>
new SyncManifest([
{
- slug: "test-manifest",
- manifestVersion: 1,
- connections: [
- {
- slug: "test-connection",
- config: {
- connectionType: "in-memory" as const,
+ path: "test-manifest.json",
+ manifest: {
+ slug: "test-manifest",
+ manifestVersion: 1,
+ connections: [
+ {
+ slug: "test-connection",
+ config: {
+ connectionType: "in-memory" as const,
+ },
},
- },
- ],
- dataStores: [
- {
- connectionSlug: "test-connection",
- config: {
- connectionType: "in-memory",
+ ],
+ dataStores: [
+ {
+ connectionSlug: "test-connection",
+ config: {
+ connectionType: "in-memory",
+ },
},
- },
- ],
- eventStores: [],
- publicSchemas: [],
- consumerSchemas: [],
+ ],
+ eventStores: [],
+ publicSchemas: [],
+ consumerSchemas: [],
+ },
},
]);