Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: force cleanup at boot with SYNC_FORCE_REMOVE=true #956

Merged
merged 7 commits into from
Jan 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ Synchronization variable names start with `SYNC_`.
| SYNC\__S3_SECRET_KEY (\_Required_) | Access key secret for the S3 store credentials; |
| SYNC\__S3_PATH_STYLE (\_Optional_) | `true` or `false`, force path style if `true`. |
| SYNC\__S3_BUCKET (\_Required_) | The bucket to be used for the system (dedicated). |
| SYNC\__FORCE_REMOVE (\_Optional_) | `true` or `false`, Undeploy cached typegraphs at boot |

## Synchronized mode features

Expand Down
1 change: 1 addition & 0 deletions src/typegate/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ export function transformSyncConfig(raw: SyncConfig): SyncConfigX {
redis,
s3,
s3Bucket: raw.s3_bucket,
forceRemove: raw.force_remove
};
}

Expand Down
2 changes: 2 additions & 0 deletions src/typegate/src/config/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -115,12 +115,14 @@ export const syncConfigSchema = z.object({
s3_access_key: refineEnvVar("SYNC_S3_ACCESS_KEY"),
s3_secret_key: refineEnvVar("SYNC_S3_SECRET_KEY"),
s3_path_style: zBooleanString.default(false),
force_remove: zBooleanString.default(false),
});
export type SyncConfig = z.infer<typeof syncConfigSchema>;
export type SyncConfigX = {
redis: RedisConnectOptions;
s3: S3ClientConfig;
s3Bucket: string;
forceRemove?: boolean
};

export type TypegateConfig = {
Expand Down
1 change: 0 additions & 1 deletion src/typegate/src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ try {
base: defaultTypegateConfigBase,
});
const typegate = await Typegate.init(config);

await SystemTypegraph.loadAll(typegate, !globalConfig.packaged);

const server = Deno.serve(
Expand Down
28 changes: 22 additions & 6 deletions src/typegate/src/sync/replicated_map.ts
Original file line number Diff line number Diff line change
Expand Up @@ -105,27 +105,43 @@ export class RedisReplicatedMap<T> implements AsyncDisposable {
this.redisObs.close();
}

async getAllHistory() {
const { key, redis } = this;
const all = await redis.hgetall(key);
const history = [];
for (let i = 0; i < all.length; i += 2) {
history.push({
name: all[i],
payload: all[i+1]
});
}

return history;
}

async historySync(): Promise<XIdInput> {
const { key, redis, deserializer } = this;
const { redis, deserializer } = this;

// get last received message before loading history
const [lastMessage] = await redis.xrevrange(this.ekey, "+", "-", 1);
const lastId = lastMessage ? lastMessage.xid : 0;
logger.debug("last message loaded: {}", lastId);

const all = await redis.hgetall(key);
const all = await this.getAllHistory();
logger.debug("history load start: {} elements", all.length);
for (let i = 0; i < all.length; i += 2) {
const name = all[i];
const payload = all[i + 1];

for (const { name, payload } of all) {
logger.info(`reloaded addition: ${name}`);
ensure(
!this.memory.has(name),
() => `typegraph ${name} should not exists in memory at first sync`,
);
this.memory.set(name, await deserializer(payload, true));

const engine = await deserializer(payload, true);
this.memory.set(name, engine);
}
logger.debug("history load end");

return lastId;
}

Expand Down
35 changes: 33 additions & 2 deletions src/typegate/src/typegate/mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
// TODO move from tests (MET-497)
import { MemoryRegister } from "./memory_register.ts";
import { NoLimiter } from "./no_limiter.ts";
import { TypegraphStore } from "../sync/typegraph.ts";
import { typegraphIdSchema, TypegraphStore } from "../sync/typegraph.ts";
import { createLocalArtifactStore } from "./artifacts/local.ts";
import { createSharedArtifactStore } from "./artifacts/shared.ts";
import { AsyncDisposableStack } from "dispose";
Expand Down Expand Up @@ -141,15 +141,30 @@
stack.move(),
);

const typegraphStore = TypegraphStore.init(syncConfig, cryptoKeys);
const register = await ReplicatedRegister.init(
typegate,
syncConfig.redis,
TypegraphStore.init(syncConfig, cryptoKeys),
typegraphStore
);
typegate.disposables.use(register);

(typegate as { register: Register }).register = register;


if (config.sync?.forceRemove) {
logger.warn("Force removal at boot enabled");
const history = await register.replicatedMap.getAllHistory();
for (const { name, payload } of history) {
try {
await typegate.forceRemove(name, payload, typegraphStore);
} catch (e) {
logger.error(`Failed to force remove typegraph "${name}": ${e}`);
Sentry.captureException(e);
}

Check warning on line 164 in src/typegate/src/typegate/mod.ts

View check run for this annotation

Codecov / codecov/patch

src/typegate/src/typegate/mod.ts#L162-L164

Added lines #L162 - L164 were not covered by tests
}
}

const lastSync = await register.historySync().catch((err) => {
logger.error(err);
throw new Error(
Expand Down Expand Up @@ -397,6 +412,22 @@
await this.artifactStore.runArtifactGC();
}

async forceRemove(name: string, payload: string, typegraphStore: TypegraphStore) {
logger.warn(`Dropping "${name}": started`);
const typegraphId = typegraphIdSchema.parse(JSON.parse(payload));
const [tg] = await typegraphStore.download(
typegraphId,
);
const artifacts = new Set(
Object.values(tg.meta.artifacts).map((m) => m.hash),
);

await this.register.remove(name);
await this.artifactStore.updateRefCounts(new Set(), artifacts);
await this.artifactStore.runArtifactGC();
logger.warn(`Dropping "${name}": done`);
}

async initQueryEngine(
tgDS: TypeGraphDS,
secretManager: SecretManager,
Expand Down
2 changes: 1 addition & 1 deletion src/typegate/src/typegate/register.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ export class ReplicatedRegister extends Register {
return new ReplicatedRegister(replicatedMap);
}

constructor(private replicatedMap: RedisReplicatedMap<QueryEngine>) {
constructor(public replicatedMap: RedisReplicatedMap<QueryEngine>) {
super();
}

Expand Down
1 change: 1 addition & 0 deletions tests/e2e/published/published_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ const syncConfig = transformSyncConfig({
s3_secret_key: syncEnvs.SYNC_S3_SECRET_KEY,
s3_bucket: syncEnvs.SYNC_S3_BUCKET,
s3_path_style: true,
force_remove: false
michael-0acf4 marked this conversation as resolved.
Show resolved Hide resolved
});

// put here typegraphs that are to be excluded
Expand Down
6 changes: 6 additions & 0 deletions tests/sync/scripts/hello.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
// Copyright Metatype OÜ, licensed under the Mozilla Public License Version 2.0.
// SPDX-License-Identifier: MPL-2.0

export function hello({ name }: { name: string }) {
return `Hello ${name}`;
}
21 changes: 21 additions & 0 deletions tests/sync/sync.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Copyright Metatype OÜ, licensed under the Mozilla Public License Version 2.0.
# SPDX-License-Identifier: MPL-2.0

from typegraph import t, typegraph, Policy, Graph
from typegraph.runtimes.deno import DenoRuntime


@typegraph()
def sync(g: Graph):
deno = DenoRuntime()
public = Policy.public()

g.expose(
hello=deno.import_(
t.struct({"name": t.string()}),
t.string(),
name="hello",
module="scripts/hello.ts",
secrets=["ULTRA_SECRET"],
).with_policy(public)
)
1 change: 1 addition & 0 deletions tests/sync/sync_config_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ Deno.test("test sync config", async (t) => {
Deno.env.set("SYNC_S3_BUCKET", "bucket");

assertEquals(getSyncConfig(), {
forceRemove: false,
redis: {
hostname: "localhost",
port: "6379",
Expand Down
114 changes: 114 additions & 0 deletions tests/sync/sync_force_remove_test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
// Copyright Metatype OÜ, licensed under the Mozilla Public License Version 2.0.
// SPDX-License-Identifier: MPL-2.0

import { gql, Meta } from "test-utils/mod.ts";
import { connect } from "redis";
import { S3Client } from "aws-sdk/client-s3";
import { createBucket, listObjects, tryDeleteBucket } from "test-utils/s3.ts";
import { assertEquals } from "@std/assert";
import { clearSyncData, setupSync } from "test-utils/hooks.ts";
import { Typegate } from "@metatype/typegate/typegate/mod.ts";
import {
defaultTypegateConfigBase,
getTypegateConfig,
SyncConfig,
} from "@metatype/typegate/config.ts";

const redisKey = "typegraph";
const redisEventKey = "typegraph_event";

async function cleanUp(config: typeof syncConfig) {
using redis = await connect(config.redis);
await redis.del(redisKey);
await redis.del(redisEventKey);

const s3 = new S3Client(config.s3);
await tryDeleteBucket(s3, config.s3Bucket);
await createBucket(s3, config.s3Bucket);
s3.destroy();
await redis.quit();
}

const syncConfig = {
redis: {
hostname: "localhost",
port: 6379,
password: "password",
db: 1,
},
s3: {
endpoint: "http://localhost:9000",
region: "local",
credentials: {
accessKeyId: "minio",
secretAccessKey: "password",
},
forcePathStyle: true,
},
s3Bucket: "metatype-deno-runtime-sync-test",
};

async function spawnGate(syncConfig: SyncConfig) {
const config = getTypegateConfig({
base: {
...defaultTypegateConfigBase,
},
});

return await Typegate.init({
...config,
sync: syncConfig,
});
}

Meta.test(
{
name: "Force cleanup at boot on sync mode",
syncConfig,
async setup() {
await clearSyncData(syncConfig);
await setupSync(syncConfig);
},
async teardown() {
await cleanUp(syncConfig);
},
},
async (t) => {
await t.should(
"cleanup if forceRemove is true",
async () => {
const _engine = await t.engine("sync/sync.py", {
secrets: {
ULTRA_SECRET:
"if_you_can_read_me_on_an_ERROR_there_is_a_bug",
},
});

const s3 = new S3Client(syncConfig.s3);
const initialObjects = await listObjects(s3, syncConfig.s3Bucket);
assertEquals(initialObjects?.length, 3);

const gateNoRemove = await spawnGate(syncConfig);
const namesNoRemove = gateNoRemove.register.list().map(({ name }) =>
name
);

const gateAfterRemove = await spawnGate({
...syncConfig,
forceRemove: true,
});
const namesAfterRemove = gateAfterRemove.register.list().map((
{ name },
) => name);

t.addCleanup(async () => {
await gateNoRemove[Symbol.asyncDispose]();
await gateAfterRemove[Symbol.asyncDispose]();
});

assertEquals(namesNoRemove, ["sync"]);
assertEquals(namesAfterRemove, []); // !
},
);
},
);
Loading