Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 24 additions & 11 deletions apps/rejot-cli/src/commands/manifest/manifest-sync.command.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import fs from "node:fs/promises";

import { z } from "zod";

import {
Expand All @@ -19,6 +17,7 @@ import { ConsoleLogger, getLogger, setLogger } from "@rejot-dev/contract/logger"
import { SyncManifestSchema } from "@rejot-dev/contract/manifest";
import type { ISubscribeMessageBus } from "@rejot-dev/contract/message-bus";
import { SyncManifest } from "@rejot-dev/contract/sync-manifest";
import { ManifestWorkspaceResolver } from "@rejot-dev/contract-tools/manifest/manifest-workspace-resolver";
import { ExternalSyncMessageBus } from "@rejot-dev/sync/external-sync-message-bus";
import { SyncController } from "@rejot-dev/sync/sync-controller-new";
import { createResolver, type ISyncServiceResolver } from "@rejot-dev/sync/sync-http-resolver";
Expand Down Expand Up @@ -125,27 +124,41 @@ export class ManifestSyncCommand extends Command {
const manifestPaths = z.array(z.string()).parse(argv);

setLogger(new ConsoleLogger(logLevel.toUpperCase()));
const workspaceResolver = new ManifestWorkspaceResolver();

try {
// Read and parse manifest files
const manifests = await Promise.all(
manifestPaths.map(async (path) => {
const content = await fs.readFile(path, "utf-8");
const json = JSON.parse(content);
return SyncManifestSchema.parse(json);
}),
);
const manifests = (
await Promise.all(
manifestPaths.flatMap(async (path) => {
const workspace = await workspaceResolver.resolveWorkspace({
startDir: process.cwd(),
filename: path,
});

if (!workspace) {
log.warn(`No workspace/manifests found in ${path}`);
return [];
}

const allManifests = [workspace.ancestor, ...workspace.children];
return allManifests;
}),
)
).flat();

log.info(`Successfully loaded ${manifests.length} manifest(s)`);

const syncManifest = new SyncManifest(manifests);
const syncManifest = new SyncManifest(manifests, {
checkPublicSchemaReferences: false,
});

// Create adapters
const { connectionAdapters, publicSchemaAdapters, consumerSchemaAdapters } =
this.#getAdapters();

// Create event store from the first manifest's event store config
const eventStore = this.#createEventStore(manifests, connectionAdapters);
const eventStore = this.#createEventStore(syncManifest.manifests, connectionAdapters);

// There are four things we need to be doing:
// 1. Listen for changes on source data stores and write them to an event store.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,13 @@ Event Stores (Replication Targets):

A ReJot workspace is a collection of manifests that are managed together. It's useful for local
development use cases where you're typically only using a single event store and only want to run a
single sync service.
single sync service. It can also be used to combine for example secrets and other configuration
across multiple manifests.

<Notice type="TIP">
Data stores are shared across all manifests in a workspace. This means they have to have unique
names across all manifests in a workspace.
</Notice>

A manifest can link to other manifests using the `workspaces` field. This field should contain the
relative path to other manifest files.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ environment resolver, where you supply a mapping of manifest to hostname through
variables:

```bash
EXPORT REJOT_SYNC_SERVICE_slug-a=sync-service-a:3000
export REJOT_SYNC_SERVICE_slug-a=sync-service-a:3000

rejot-cli manifest sync rejot-manifest.json \
--hostname 0.0.0.0 \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ base manifest won't specify any connections:
// rejot-manifest.base.json
{
"connections": [],
"dataStores": ...,
"publicSchemas": ...,
}
// rejot-manifest.dev.json
Expand All @@ -46,7 +45,13 @@ base manifest won't specify any connections:
"host": "localhost",
...
}
}]
}],
"dataStores": [
{
"connectionSlug": "conn-datastore-a",
...
}
]
}

// rejot-manifest.prod.json
Expand All @@ -57,7 +62,13 @@ base manifest won't specify any connections:
"host": "my-datastore-a.europe-west4.domain.com",
...
}
}]
}],
"dataStores": [
{
"connectionSlug": "conn-datastore-a",
...
}
]
}
```

Expand All @@ -74,4 +85,20 @@ rejot-cli manifest sync rejot-manifest.base.json rejot-manifest.prod.json
## Workspaces

Use the [`workspaces`](/docs/spec/manifest) key in your manifest to include other Manifest files
automatically.
automatically. This can simplify the management of multiple manifests, for example when you have
multiple services that need to share the same connection configuration.

For example, workspace file `rejot-workspace.json` might look like this:

```json
{
"workspaces": ["./service-one/rejot-manifest.json", "./service-two/rejot-manifest.json"]
}
```

And call the sync service with multiple manifests:

```bash
# Development
rejot-cli manifest sync rejot-workspace.json
```
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,11 @@ export class PostgresReplicationListener {
state: "empty",
};

log.info("PostgresReplicationListener initialized", { database: config.database });
log.info("PostgresReplicationListener initialized", {
database: config.database,
host: config.host,
port: config.port,
});
}

async start(publicationName: string, slotName: string): Promise<boolean> {
Expand Down
23 changes: 19 additions & 4 deletions packages/adapter-postgres/src/util/postgres-client.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
import type { ClientBase, QueryResult, QueryResultRow } from "pg";
import { DatabaseError, Pool } from "pg";
import type { ClientBase, PoolClient, QueryResult, QueryResultRow } from "pg";
import pg from "pg";

import { getLogger } from "@rejot-dev/contract/logger";

const { DatabaseError, Pool } = pg;

export interface PostgresConfig {
host: string;
Expand All @@ -9,6 +13,8 @@ export interface PostgresConfig {
database: string;
}

const log = getLogger(import.meta.url);

/**
* Parses a Postgres connection string into a PostgresConfig object.
* Supports both 'postgres://' and 'postgresql://' protocols.
Expand Down Expand Up @@ -50,7 +56,7 @@ export function parsePostgresConnectionString(connectionString: string): Postgre
type PoolOrClient =
| {
type: "pool";
pool: Pool;
pool: pg.Pool;
}
| {
type: "client";
Expand Down Expand Up @@ -140,7 +146,16 @@ export class PostgresClient implements IPostgresClient {
const poolOrClient = this.#poolOrClient;

if (poolOrClient.type === "pool") {
const client = await poolOrClient.pool.connect();
let client: PoolClient;
try {
client = await poolOrClient.pool.connect();
} catch (error) {
log.error(
`Failed to connect to Postgres, connection string: ${this.#config.host}:${this.#config.port}/${this.#config.database}`,
);
throw error;
}

try {
return await this.#query(client, queryText, values);
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,10 +137,7 @@ export function workspaceToSyncManifest(
workspace: WorkspaceDefinition,
options: SyncManifestOptions = {},
): SyncManifest {
return new SyncManifest(
[workspace.ancestor.manifest, ...workspace.children.map((child) => child.manifest)],
options,
);
return new SyncManifest([workspace.ancestor, ...workspace.children], options);
}

export function getManifestBySlug(
Expand Down
6 changes: 1 addition & 5 deletions packages/contract/manifest/manifest-helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -150,11 +150,7 @@ export function getPublicSchemasForDataStore(
return manifests.flatMap((manifest) =>
(manifest.publicSchemas ?? [])
.filter((schema) => {
const dataStore = (manifest.dataStores ?? []).find(
(ds) => ds.connectionSlug === sourceDataStoreSlug,
);
// Ensure dataStore exists and matches the source slug, and the operation table is included
return dataStore && schema.source.dataStoreSlug === dataStore.connectionSlug;
return schema.source.dataStoreSlug === sourceDataStoreSlug;
})
.map(({ name, source, config, version, outputSchema }) => ({
name,
Expand Down
39 changes: 32 additions & 7 deletions packages/contract/manifest/sync-manifest.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,16 @@ describe("SyncManifest", () => {
} satisfies Manifest;

// Initialize SyncManifest with both manifests
const syncManifest = new SyncManifest([manifestAWithPublicSchema, manifestBWithConsumer]);
const syncManifest = new SyncManifest([
{
path: "test-manifest.json",
manifest: manifestAWithPublicSchema,
},
{
path: "test-manifest2.json",
manifest: manifestBWithConsumer,
},
]);

// There should be no external schemas since all references are resolved internally
const externalSchemas = syncManifest.getExternalConsumerSchemas();
Expand All @@ -134,9 +143,17 @@ describe("SyncManifest", () => {
);

// Initialize SyncManifest with just our local manifest
const syncManifest = new SyncManifest([manifestWithExternalReferences], {
checkPublicSchemaReferences: false,
});
const syncManifest = new SyncManifest(
[
{
path: "test-manifest.json",
manifest: manifestWithExternalReferences,
},
],
{
checkPublicSchemaReferences: false,
},
);

// Get external consumer schemas
const externalSchemas = syncManifest.getExternalConsumerSchemas();
Expand Down Expand Up @@ -176,9 +193,17 @@ describe("SyncManifest", () => {
);

// Initialize SyncManifest with our local manifest
const syncManifest = new SyncManifest([manifestWithExternalReferences], {
checkPublicSchemaReferences: false,
});
const syncManifest = new SyncManifest(
[
{
path: "test-manifest.json",
manifest: manifestWithExternalReferences,
},
],
{
checkPublicSchemaReferences: false,
},
);

// Get external consumer schemas
const externalSchemas = syncManifest.getExternalConsumerSchemas();
Expand Down
31 changes: 16 additions & 15 deletions packages/contract/manifest/sync-manifest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { z } from "zod";

import type { TransformedOperationWithSource } from "../event-store/event-store.ts";
import { getLogger } from "../logger/logger.ts";
import type { ManifestWithPath } from "../workspace/workspace.ts";
import {
type ConnectionConfigSchema,
ConsumerSchemaSchema,
Expand All @@ -24,7 +25,7 @@ import {
import {
type ExternalPublicSchemaReference,
type VerificationResult,
verifyManifests,
verifyManifestsWithPaths,
} from "./verify-manifest.ts";

const log = getLogger(import.meta.url);
Expand Down Expand Up @@ -61,11 +62,11 @@ export interface SyncManifestOptions {
}

export class SyncManifest {
readonly #manifests: Manifest[];
readonly #manifests: ManifestWithPath[];
readonly #externalSchemaReferences: ExternalPublicSchemaReference[];

constructor(manifests: Manifest[], options: SyncManifestOptions = {}) {
const verificationResult: VerificationResult = verifyManifests(
constructor(manifests: ManifestWithPath[], options: SyncManifestOptions = {}) {
const verificationResult: VerificationResult = verifyManifestsWithPaths(
manifests,
options.checkPublicSchemaReferences,
);
Expand Down Expand Up @@ -109,31 +110,31 @@ export class SyncManifest {
}

get manifests(): Manifest[] {
return this.#manifests;
return this.#manifests.map((m) => m.manifest);
}

get connections(): Connection[] {
return getConnectionsHelper(this.#manifests);
return getConnectionsHelper(this.manifests);
}

get dataStores(): NonNullable<Manifest["dataStores"]> {
return getDataStoresHelper(this.#manifests);
return getDataStoresHelper(this.manifests);
}

get eventStores(): NonNullable<Manifest["eventStores"]> {
return getEventStoresHelper(this.#manifests);
return getEventStoresHelper(this.manifests);
}

getSourceDataStores(): SourceDataStore[] {
return getSourceDataStoresHelper(this.#manifests);
return getSourceDataStoresHelper(this.manifests);
}

getDestinationDataStores(): DestinationDataStore[] {
return getDestinationDataStoresHelper(this.#manifests);
return getDestinationDataStoresHelper(this.manifests);
}

getConnectionBySlug(connectionSlug: string): Connection | undefined {
return getConnectionBySlugHelper(this.#manifests, connectionSlug);
return getConnectionBySlugHelper(this.manifests, connectionSlug);
}

get hasUnresolvedExternalReferences(): boolean {
Expand All @@ -148,23 +149,23 @@ export class SyncManifest {
* @throws Error if the internal state is inconsistent (e.g., referenced manifest or schema not found).
*/
getExternalConsumerSchemas(): Record<string, z.infer<typeof ConsumerSchemaSchema>[]> {
return getExternalConsumerSchemasHelper(this.#manifests);
return getExternalConsumerSchemasHelper(this.manifests);
}

getConsumerSchemasForPublicSchema(
operation: TransformedOperationWithSource,
): z.infer<typeof ConsumerSchemaSchema>[] {
return getConsumerSchemasForPublicSchemaHelper(this.#manifests, operation);
return getConsumerSchemasForPublicSchemaHelper(this.manifests, operation);
}

getPublicSchemasForOperation(
dataStoreSlug: string,
): (z.infer<typeof PublicSchemaSchema> & { sourceManifestSlug: string })[] {
return getPublicSchemasForDataStore(this.#manifests, dataStoreSlug);
return getPublicSchemasForDataStore(this.manifests, dataStoreSlug);
}

getPublicSchemas(): (z.infer<typeof PublicSchemaSchema> & { manifestSlug: string })[] {
return getPublicSchemasHelper(this.#manifests);
return getPublicSchemasHelper(this.manifests);
}

/**
Expand Down
Loading