Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ import {
OrchestrationEngineService,
type OrchestrationEngineShape,
} from "../src/orchestration/Services/OrchestrationEngine.ts";
import { ThreadDeletionReactor } from "../src/orchestration/Services/ThreadDeletionReactor.ts";
import { OrchestrationReactor } from "../src/orchestration/Services/OrchestrationReactor.ts";
import { ProjectionSnapshotQuery } from "../src/orchestration/Services/ProjectionSnapshotQuery.ts";
import {
Expand Down Expand Up @@ -351,6 +352,12 @@ export const makeOrchestrationIntegrationHarness = (
Layer.provideMerge(runtimeIngestionLayer),
Layer.provideMerge(providerCommandReactorLayer),
Layer.provideMerge(checkpointReactorLayer),
Layer.provideMerge(
Layer.succeed(ThreadDeletionReactor, {
start: () => Effect.void,
drain: Effect.void,
}),
),
);
const layer = Layer.empty.pipe(
Layer.provideMerge(runtimeServicesLayer),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { afterEach, describe, expect, it } from "vitest";
import { CheckpointReactor } from "../Services/CheckpointReactor.ts";
import { ProviderCommandReactor } from "../Services/ProviderCommandReactor.ts";
import { ProviderRuntimeIngestionService } from "../Services/ProviderRuntimeIngestion.ts";
import { ThreadDeletionReactor } from "../Services/ThreadDeletionReactor.ts";
import { OrchestrationReactor } from "../Services/OrchestrationReactor.ts";
import { makeOrchestrationReactor } from "./OrchestrationReactor.ts";

Expand All @@ -17,7 +18,7 @@ describe("OrchestrationReactor", () => {
runtime = null;
});

it("starts provider ingestion, provider command, and checkpoint reactors", async () => {
it("starts provider ingestion, provider command, checkpoint, and thread deletion reactors", async () => {
const started: string[] = [];

runtime = ManagedRuntime.make(
Expand Down Expand Up @@ -49,6 +50,15 @@ describe("OrchestrationReactor", () => {
drain: Effect.void,
}),
),
Layer.provideMerge(
Layer.succeed(ThreadDeletionReactor, {
start: () => {
started.push("thread-deletion-reactor");
return Effect.void;
},
drain: Effect.void,
}),
),
),
);

Expand All @@ -60,6 +70,7 @@ describe("OrchestrationReactor", () => {
"provider-runtime-ingestion",
"provider-command-reactor",
"checkpoint-reactor",
"thread-deletion-reactor",
]);

await Effect.runPromise(Scope.close(scope, Exit.void));
Expand Down
3 changes: 3 additions & 0 deletions apps/server/src/orchestration/Layers/OrchestrationReactor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,19 @@ import {
import { CheckpointReactor } from "../Services/CheckpointReactor.ts";
import { ProviderCommandReactor } from "../Services/ProviderCommandReactor.ts";
import { ProviderRuntimeIngestionService } from "../Services/ProviderRuntimeIngestion.ts";
import { ThreadDeletionReactor } from "../Services/ThreadDeletionReactor.ts";

export const makeOrchestrationReactor = Effect.gen(function* () {
const providerRuntimeIngestion = yield* ProviderRuntimeIngestionService;
const providerCommandReactor = yield* ProviderCommandReactor;
const checkpointReactor = yield* CheckpointReactor;
const threadDeletionReactor = yield* ThreadDeletionReactor;

const start: OrchestrationReactorShape["start"] = Effect.fn("start")(function* () {
yield* providerRuntimeIngestion.start();
yield* providerCommandReactor.start();
yield* checkpointReactor.start();
yield* threadDeletionReactor.start();
});

return {
Expand Down
36 changes: 36 additions & 0 deletions apps/server/src/orchestration/Layers/ThreadDeletionReactor.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import { ThreadId } from "@t3tools/contracts";
import { Cause, Effect, Exit } from "effect";
import { describe, expect, it } from "vitest";

import { logCleanupCauseUnlessInterrupted } from "./ThreadDeletionReactor.ts";

describe("logCleanupCauseUnlessInterrupted", () => {
const threadId = ThreadId.makeUnsafe("thread-deletion-reactor-test");

it("swallows ordinary cleanup failures", async () => {
const exit = await Effect.runPromiseExit(
logCleanupCauseUnlessInterrupted({
effect: Effect.fail("cleanup failed"),
message: "thread deletion cleanup skipped provider session stop",
threadId,
}),
);

expect(Exit.isSuccess(exit)).toBe(true);
});

it("preserves interrupt causes", async () => {
const exit = await Effect.runPromiseExit(
logCleanupCauseUnlessInterrupted({
effect: Effect.interrupt,
message: "thread deletion cleanup skipped provider session stop",
threadId,
}),
);

expect(Exit.isFailure(exit)).toBe(true);
if (Exit.isFailure(exit)) {
expect(Cause.hasInterruptsOnly(exit.cause)).toBe(true);
}
});
});
96 changes: 96 additions & 0 deletions apps/server/src/orchestration/Layers/ThreadDeletionReactor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
import type { OrchestrationEvent } from "@t3tools/contracts";
import { makeDrainableWorker } from "@t3tools/shared/DrainableWorker";
import { Cause, Effect, Layer, Stream } from "effect";

import { ProviderService } from "../../provider/Services/ProviderService.ts";
import { TerminalManager } from "../../terminal/Services/Manager.ts";
import { OrchestrationEngineService } from "../Services/OrchestrationEngine.ts";
import {
ThreadDeletionReactor,
type ThreadDeletionReactorShape,
} from "../Services/ThreadDeletionReactor.ts";

type ThreadDeletedEvent = Extract<OrchestrationEvent, { type: "thread.deleted" }>;

export const logCleanupCauseUnlessInterrupted = <R, E>({
effect,
message,
threadId,
}: {
readonly effect: Effect.Effect<void, E, R>;
readonly message: string;
readonly threadId: ThreadDeletedEvent["payload"]["threadId"];
}): Effect.Effect<void, E, R> =>
effect.pipe(
Effect.catchCause((cause) => {
if (Cause.hasInterruptsOnly(cause)) {
return Effect.failCause(cause);
}
return Effect.logDebug(message, {
threadId,
cause: Cause.pretty(cause),
});
}),
);

const make = Effect.gen(function* () {
const orchestrationEngine = yield* OrchestrationEngineService;
const providerService = yield* ProviderService;
const terminalManager = yield* TerminalManager;

const stopProviderSession = (threadId: ThreadDeletedEvent["payload"]["threadId"]) =>
logCleanupCauseUnlessInterrupted({
effect: providerService.stopSession({ threadId }),
message: "thread deletion cleanup skipped provider session stop",
threadId,
});

const closeThreadTerminals = (threadId: ThreadDeletedEvent["payload"]["threadId"]) =>
logCleanupCauseUnlessInterrupted({
effect: terminalManager.close({ threadId, deleteHistory: true }),
message: "thread deletion cleanup skipped terminal close",
threadId,
});

const processThreadDeleted = Effect.fn("processThreadDeleted")(function* (
event: ThreadDeletedEvent,
) {
const { threadId } = event.payload;
yield* stopProviderSession(threadId);
yield* closeThreadTerminals(threadId);
});

const processThreadDeletedSafely = (event: ThreadDeletedEvent) =>
processThreadDeleted(event).pipe(
Effect.catchCause((cause) => {
if (Cause.hasInterruptsOnly(cause)) {
return Effect.failCause(cause);
}
return Effect.logWarning("thread deletion reactor failed to process event", {
eventType: event.type,
threadId: event.payload.threadId,
cause: Cause.pretty(cause),
});
}),
);

const worker = yield* makeDrainableWorker(processThreadDeletedSafely);

const start: ThreadDeletionReactorShape["start"] = Effect.fn("start")(function* () {
yield* Effect.forkScoped(
Stream.runForEach(orchestrationEngine.streamDomainEvents, (event) => {
if (event.type !== "thread.deleted") {
return Effect.void;
}
return worker.enqueue(event);
}),
);
});

return {
start,
drain: worker.drain,
} satisfies ThreadDeletionReactorShape;
});

export const ThreadDeletionReactorLive = Layer.effect(ThreadDeletionReactor, make);
37 changes: 37 additions & 0 deletions apps/server/src/orchestration/Services/ThreadDeletionReactor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/**
* ThreadDeletionReactor - Thread deletion cleanup reactor service interface.
*
* Owns background workers that react to thread deletion domain events and
* perform best-effort runtime cleanup for provider sessions and terminals.
*
* @module ThreadDeletionReactor
*/
import { ServiceMap } from "effect";
import type { Effect, Scope } from "effect";

/**
* ThreadDeletionReactorShape - Service API for thread deletion cleanup.
*/
export interface ThreadDeletionReactorShape {
/**
* Start reacting to thread.deleted orchestration domain events.
*
* The returned effect must be run in a scope so all worker fibers can be
* finalized on shutdown.
*/
readonly start: () => Effect.Effect<void, never, Scope.Scope>;

/**
* Resolves when the internal processing queue is empty and idle.
* Intended for test use to replace timing-sensitive sleeps.
*/
readonly drain: Effect.Effect<void>;
}

/**
* ThreadDeletionReactor - Service tag for thread deletion cleanup workers.
*/
export class ThreadDeletionReactor extends ServiceMap.Service<
ThreadDeletionReactor,
ThreadDeletionReactorShape
>()("t3/orchestration/Services/ThreadDeletionReactor") {}
Loading
Loading