diff --git a/apps/server/src/orchestration/Layers/OrchestrationEngine.test.ts b/apps/server/src/orchestration/Layers/OrchestrationEngine.test.ts index 6aa889991e..b5e8a4f8b7 100644 --- a/apps/server/src/orchestration/Layers/OrchestrationEngine.test.ts +++ b/apps/server/src/orchestration/Layers/OrchestrationEngine.test.ts @@ -245,6 +245,157 @@ describe("OrchestrationEngine", () => { await system.dispose(); }); + it("prioritizes client commands ahead of queued internal stream commands", async () => { + const system = await createOrchestrationSystem(); + const { engine } = system; + const createdAt = now(); + const threadId = ThreadId.makeUnsafe("thread-priority-lane"); + + await system.run( + engine.dispatch({ + type: "project.create", + commandId: CommandId.makeUnsafe("cmd-project-priority-lane-create"), + projectId: asProjectId("project-priority-lane"), + title: "Priority Lane Project", + workspaceRoot: "/tmp/project-priority-lane", + defaultModelSelection: { + provider: "codex", + model: "gpt-5-codex", + }, + createdAt, + }), + ); + await system.run( + engine.dispatch({ + type: "thread.create", + commandId: CommandId.makeUnsafe("cmd-thread-priority-lane-create"), + threadId, + projectId: asProjectId("project-priority-lane"), + title: "Priority Lane Thread", + modelSelection: { + provider: "codex", + model: "gpt-5-codex", + }, + interactionMode: DEFAULT_PROVIDER_INTERACTION_MODE, + runtimeMode: "full-access", + branch: null, + worktreePath: null, + createdAt, + }), + ); + + const lowPriorityCommandCount = 250; + const lowPriorityDispatches = Array.from({ length: lowPriorityCommandCount }, (_, index) => + system.run( + engine.dispatch({ + type: "thread.message.assistant.delta", + commandId: CommandId.makeUnsafe(`cmd-thread-priority-lane-delta-${index}`), + threadId, + messageId: asMessageId("assistant-priority-lane"), + delta: `chunk-${index}`, + turnId: asTurnId("turn-priority-lane"), + createdAt, + }), + ), + ); + + const archiveResult = await system.run( + engine.dispatch({ + type: "thread.archive", + commandId: CommandId.makeUnsafe("cmd-thread-priority-lane-archive"), + threadId, + }), + ); + const lowPriorityResults = await Promise.all(lowPriorityDispatches); + const lowPriorityCommandsAheadOfArchive = lowPriorityResults.filter( + (result) => result.sequence < archiveResult.sequence, + ).length; + + expect(lowPriorityCommandsAheadOfArchive).toBeLessThan(lowPriorityCommandCount / 3); + expect(archiveResult.sequence).toBeLessThan(lowPriorityResults.at(-1)?.sequence ?? Infinity); + await system.dispose(); + }); + + it("treats normalized thread.turn.start as a prioritized client command", async () => { + const system = await createOrchestrationSystem(); + const { engine } = system; + const createdAt = now(); + const threadId = ThreadId.makeUnsafe("thread-priority-turn-start"); + + await system.run( + engine.dispatch({ + type: "project.create", + commandId: CommandId.makeUnsafe("cmd-project-priority-turn-start-create"), + projectId: asProjectId("project-priority-turn-start"), + title: "Priority Turn Start Project", + workspaceRoot: "/tmp/project-priority-turn-start", + defaultModelSelection: { + provider: "codex", + model: "gpt-5-codex", + }, + createdAt, + }), + ); + await system.run( + engine.dispatch({ + type: "thread.create", + commandId: CommandId.makeUnsafe("cmd-thread-priority-turn-start-create"), + threadId, + projectId: asProjectId("project-priority-turn-start"), + title: "Priority Turn Start Thread", + modelSelection: { + provider: "codex", + model: "gpt-5-codex", + }, + interactionMode: DEFAULT_PROVIDER_INTERACTION_MODE, + runtimeMode: "full-access", + branch: null, + worktreePath: null, + createdAt, + }), + ); + + const lowPriorityCommandCount = 150; + const lowPriorityDispatches = Array.from({ length: lowPriorityCommandCount }, (_, index) => + system.run( + engine.dispatch({ + type: "thread.message.assistant.delta", + commandId: CommandId.makeUnsafe(`cmd-thread-priority-turn-start-delta-${index}`), + threadId, + messageId: asMessageId("assistant-priority-turn-start"), + delta: `chunk-${index}`, + turnId: asTurnId("turn-priority-turn-start-internal"), + createdAt, + }), + ), + ); + + const turnStartResult = await system.run( + engine.dispatch({ + type: "thread.turn.start", + commandId: CommandId.makeUnsafe("cmd-thread-priority-turn-start"), + threadId, + message: { + messageId: asMessageId("user-priority-turn-start"), + role: "user", + text: "hello", + attachments: [], + }, + interactionMode: DEFAULT_PROVIDER_INTERACTION_MODE, + runtimeMode: "full-access", + createdAt, + }), + ); + const lowPriorityResults = await Promise.all(lowPriorityDispatches); + const lowPriorityCommandsAheadOfTurnStart = lowPriorityResults.filter( + (result) => result.sequence < turnStartResult.sequence, + ).length; + + expect(lowPriorityCommandsAheadOfTurnStart).toBeLessThan(lowPriorityCommandCount / 3); + expect(turnStartResult.sequence).toBeLessThan(lowPriorityResults.at(-1)?.sequence ?? Infinity); + await system.dispose(); + }); + it("streams persisted domain events in order", async () => { const system = await createOrchestrationSystem(); const { engine } = system; diff --git a/apps/server/src/orchestration/Layers/OrchestrationEngine.ts b/apps/server/src/orchestration/Layers/OrchestrationEngine.ts index 5c52379f47..97388783a5 100644 --- a/apps/server/src/orchestration/Layers/OrchestrationEngine.ts +++ b/apps/server/src/orchestration/Layers/OrchestrationEngine.ts @@ -4,8 +4,18 @@ import type { ProjectId, ThreadId, } from "@t3tools/contracts"; -import { OrchestrationCommand } from "@t3tools/contracts"; -import { Deferred, Effect, Layer, Option, PubSub, Queue, Schema, Stream } from "effect"; +import { DispatchableClientOrchestrationCommand, OrchestrationCommand } from "@t3tools/contracts"; +import { + Deferred, + Effect, + Layer, + Option, + Order, + PubSub, + Schema, + Stream, + TxPriorityQueue, +} from "effect"; import * as SqlClient from "effect/unstable/sql/SqlClient"; import { toPersistenceSqlError } from "../../persistence/Errors.ts"; @@ -29,6 +39,24 @@ interface CommandEnvelope { result: Deferred.Deferred<{ sequence: number }, OrchestrationDispatchError>; } +type CommandPriority = 0 | 1; + +interface PrioritizedCommandEnvelope { + readonly priority: CommandPriority; + readonly insertionSequence: number; + readonly envelope: CommandEnvelope; +} + +const COMMAND_PRIORITY = { + control: 0, + stream: 1, +} as const satisfies Record; + +const prioritizedCommandEnvelopeOrder = Order.combine( + Order.mapInput(Order.Number, (item: PrioritizedCommandEnvelope) => item.priority), + Order.mapInput(Order.Number, (item: PrioritizedCommandEnvelope) => item.insertionSequence), +); + function commandToAggregateRef(command: OrchestrationCommand): { readonly aggregateKind: "project" | "thread"; readonly aggregateId: ProjectId | ThreadId; @@ -49,6 +77,13 @@ function commandToAggregateRef(command: OrchestrationCommand): { } } +function commandPriority(command: OrchestrationCommand): CommandPriority { + if (Schema.is(DispatchableClientOrchestrationCommand)(command)) { + return COMMAND_PRIORITY.control; + } + return COMMAND_PRIORITY.stream; +} + const makeOrchestrationEngine = Effect.gen(function* () { const sql = yield* SqlClient.SqlClient; const eventStore = yield* OrchestrationEventStore; @@ -56,8 +91,11 @@ const makeOrchestrationEngine = Effect.gen(function* () { const projectionPipeline = yield* OrchestrationProjectionPipeline; let readModel = createEmptyReadModel(new Date().toISOString()); + let nextCommandInsertionSequence = 0; - const commandQueue = yield* Queue.unbounded(); + const commandQueue = yield* TxPriorityQueue.empty( + prioritizedCommandEnvelopeOrder, + ); const eventPubSub = yield* PubSub.unbounded(); const processEnvelope = (envelope: CommandEnvelope): Effect.Effect => { @@ -203,7 +241,13 @@ const makeOrchestrationEngine = Effect.gen(function* () { }), ); - const worker = Effect.forever(Queue.take(commandQueue).pipe(Effect.flatMap(processEnvelope))); + const worker = Effect.forever( + TxPriorityQueue.take(commandQueue).pipe( + Effect.tx, + Effect.map((item) => item.envelope), + Effect.flatMap(processEnvelope), + ), + ); yield* Effect.forkScoped(worker); yield* Effect.logDebug("orchestration engine started").pipe( Effect.annotateLogs({ sequence: readModel.snapshotSequence }), @@ -218,7 +262,11 @@ const makeOrchestrationEngine = Effect.gen(function* () { const dispatch: OrchestrationEngineShape["dispatch"] = (command) => Effect.gen(function* () { const result = yield* Deferred.make<{ sequence: number }, OrchestrationDispatchError>(); - yield* Queue.offer(commandQueue, { command, result }); + yield* TxPriorityQueue.offer(commandQueue, { + priority: commandPriority(command), + insertionSequence: nextCommandInsertionSequence++, + envelope: { command, result }, + }).pipe(Effect.tx); return yield* Deferred.await(result); }); diff --git a/packages/contracts/src/orchestration.ts b/packages/contracts/src/orchestration.ts index bd7e374f32..e48b3fb7b7 100644 --- a/packages/contracts/src/orchestration.ts +++ b/packages/contracts/src/orchestration.ts @@ -463,7 +463,7 @@ const ThreadSessionStopCommand = Schema.Struct({ createdAt: IsoDateTime, }); -const DispatchableClientOrchestrationCommand = Schema.Union([ +export const DispatchableClientOrchestrationCommand = Schema.Union([ ProjectCreateCommand, ProjectMetaUpdateCommand, ProjectDeleteCommand, diff --git a/performance.md b/performance.md new file mode 100644 index 0000000000..5a25177c24 --- /dev/null +++ b/performance.md @@ -0,0 +1,85 @@ +# Performance + +## Goal + +Measure end-to-end server latency for control-plane commands through the real websocket/API surface, establish reproducible baselines under representative load, and track the effect of each optimization branch with before/after artifacts. + +## Hypotheses + +- `H1` Control-plane commands are waiting behind provider stream ingestion because `thread.create`, `thread.archive`, and chunk-derived commands all share the same serialized orchestration path. +- `H2` Projection work per streamed chunk is inflating queue occupancy enough to delay unrelated commands. +- `H3` Terminal output and git-heavy repositories add secondary pressure that can compound the control-plane delay seen in the sidebar. + +## Workstreams + +- `[completed]` Add server-first perf baselines for websocket command latency under stream, terminal, and git load. +- `[completed]` Establish baseline artifacts and summarize p50/p95 timings here. +- `[completed]` Reconcile the baseline with the manual sidebar lag repro before cutting optimization branches. +- `[completed]` Optimization 2: reduce per-event projection overhead. +- `[completed]` Optimization 3: prioritize control-plane dispatch over internal stream traffic. +- `[pending]` Optimization 4: reduce global worker head-of-line blocking in runtime ingestion/reactors. +- `[pending]` Optimization 5: evaluate adapter/service queue hops and any additional bottlenecks found during measurement. + +## Artifact Log + +| Branch | Scope | Status | Artifact | +| ------------------------------------------------ | ------------------------------------ | --------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ | +| `t3code/performance-regression-tests` | Server perf harness + baseline specs | completed | [control-plane-stream-baseline.json](/Users/julius/.t3/worktrees/codething-mvp/t3code-93392e80/artifacts/perf/server/server-latency-critical-commands-burst_base-1775120110519/control-plane-stream-baseline.json) | +| `t3code/performance-regression-tests` | Terminal + mixed git baseline | completed | [terminal-mixed-git-baseline.json](/Users/julius/.t3/worktrees/codething-mvp/t3code-93392e80/artifacts/perf/server/server-latency-critical-commands-burst_base-1775120111902/terminal-mixed-git-baseline.json) | +| `t3code/performance-regression-tests` | Spam canary baseline | completed | [control-plane-stream-baseline.json](/Users/julius/.t3/worktrees/codething-mvp/t3code-93392e80/artifacts/perf/server/server-latency-critical-commands-burst_base-1775120442873/control-plane-stream-baseline.json) | +| `t3code/perf-checkpoint-reactor-fanout` | Spam canary rerun | completed | [control-plane-stream-baseline.json](/Users/julius/.t3/worktrees/codething-mvp/t3code-93392e80/artifacts/perf/server/server-latency-critical-commands-burst_base-1775120608211/control-plane-stream-baseline.json) | +| `t3code/perf-checkpoint-reactor-fanout` | Terminal + mixed git rerun | completed | [terminal-mixed-git-baseline.json](/Users/julius/.t3/worktrees/codething-mvp/t3code-93392e80/artifacts/perf/server/server-latency-critical-commands-burst_base-1775120609839/terminal-mixed-git-baseline.json) | +| `t3code/perf-projection-thread-message-hot-path` | Spam canary rerun | completed | [control-plane-stream-baseline.json](/Users/julius/.t3/worktrees/codething-mvp/t3code-93392e80/artifacts/perf/server/server-latency-critical-commands-burst_base-1775120998407/control-plane-stream-baseline.json) | +| `t3code/perf-projection-thread-message-hot-path` | Terminal + mixed git rerun | completed | [terminal-mixed-git-baseline.json](/Users/julius/.t3/worktrees/codething-mvp/t3code-93392e80/artifacts/perf/server/server-latency-critical-commands-burst_base-1775121000048/terminal-mixed-git-baseline.json) | +| `t3code/perf-control-plane-priority-lane` | Spam canary rerun | completed | [control-plane-stream-baseline.json](/Users/julius/.t3/worktrees/codething-mvp/t3code-93392e80/artifacts/perf/server/server-latency-critical-commands-burst_base-1775121334761/control-plane-stream-baseline.json) | +| `t3code/perf-control-plane-priority-lane` | Terminal + mixed git rerun | completed | [terminal-mixed-git-baseline.json](/Users/julius/.t3/worktrees/codething-mvp/t3code-93392e80/artifacts/perf/server/server-latency-critical-commands-burst_base-1775121336380/terminal-mixed-git-baseline.json) | + +## Best So Far + +- `create-turn-spam-8x` `thread.create dispatch -> thread.created` + Baseline: `3.61ms / 58.38ms` + Best so far: `2.38ms / 20.36ms` on `t3code/perf-projection-thread-message-hot-path` + Tail improvement: `-65%` on p95 +- `create-turn-spam-8x` `thread.archive dispatch -> thread.archived` + Baseline: `5.71ms / 50.80ms` + Best so far: `1.09ms / 12.60ms` on `t3code/perf-control-plane-priority-lane` + Tail improvement: `-75%` on p95 +- `mixed-stream-terminal-git` `thread.create dispatch -> thread.created` + Baseline: `0.89ms / 1.57ms` + Best so far: `1.16ms / 1.33ms` on `t3code/perf-control-plane-priority-lane` + Tail improvement: `-15%` on p95 +- `git.status` + No clear improvement yet. The measurements move around, but there is no stable win I would claim from these changes. + +## Detailed Results + +| Branch | Change | Profile | Metric | Before p50 / p95 | After p50 / p95 | Notes | +| ------------------------------------------------ | -------------- | --------------------------- | -------------------------------------------- | --------------------- | --------------------- | ---------------------------------------------------------------------------------- | +| `t3code/performance-regression-tests` | Baseline | `idle` | `thread.create dispatch -> thread.created` | `1.48ms / 14.95ms` | `n/a` | one cold-start outlier remains, steady-state is low single-digit ms | +| `t3code/performance-regression-tests` | Baseline | `idle` | `thread.archive dispatch -> thread.archived` | `1.64ms / 2.22ms` | `n/a` | no material queueing at the WS boundary | +| `t3code/performance-regression-tests` | Baseline | `assistant-stream-5x` | `thread.create dispatch -> thread.created` | `1.34ms / 1.85ms` | `n/a` | 5 passive background streams still look healthy | +| `t3code/performance-regression-tests` | Baseline | `assistant-stream-5x` | `thread.archive dispatch -> thread.archived` | `1.03ms / 1.27ms` | `n/a` | still effectively realtime | +| `t3code/performance-regression-tests` | Baseline | `create-turn-spam-8x` | `thread.create dispatch -> thread.created` | `3.61ms / 58.38ms` | `n/a` | first server-first profile that reproduces visible event delay | +| `t3code/performance-regression-tests` | Baseline | `create-turn-spam-8x` | `thread.archive dispatch -> thread.archived` | `5.71ms / 50.80ms` | `n/a` | `dispatch -> ack` stays low; delay is mostly `ack -> event` | +| `t3code/performance-regression-tests` | Baseline | `terminal-output-3x` | `thread.create dispatch -> thread.created` | `1.55ms / 1.92ms` | `n/a` | terminal output alone did not move command latency much | +| `t3code/performance-regression-tests` | Baseline | `mixed-stream-terminal-git` | `thread.create dispatch -> thread.created` | `0.89ms / 1.57ms` | `n/a` | combined load still fast at the command/event boundary | +| `t3code/performance-regression-tests` | Baseline | `idle-repo-pressure` | `git.status` | `135.30ms / 149.97ms` | `n/a` | 240 branches + 160 untracked files | +| `t3code/performance-regression-tests` | Baseline | `mixed-stream-terminal-git` | `git.status` | `150.08ms / 169.73ms` | `n/a` | git RPC remains the largest measured server-side cost | +| `t3code/performance-regression-tests` | Baseline | `idle-repo-pressure` | `git.listBranches` | `47.60ms / 54.00ms` | `n/a` | branch enumeration cost is measurable but stable | +| `t3code/perf-checkpoint-reactor-fanout` | Optimization 1 | `create-turn-spam-8x` | `thread.create dispatch -> thread.created` | `3.61ms / 58.38ms` | `2.44ms / 67.52ms` | p50 improved, tail got noisier; likely still backlog-dominated | +| `t3code/perf-checkpoint-reactor-fanout` | Optimization 1 | `create-turn-spam-8x` | `thread.archive dispatch -> thread.archived` | `5.71ms / 50.80ms` | `0.93ms / 17.98ms` | meaningful improvement in typical and tail latency | +| `t3code/perf-checkpoint-reactor-fanout` | Optimization 1 | `mixed-stream-terminal-git` | `git.status` | `150.08ms / 169.73ms` | `146.87ms / 170.45ms` | effectively unchanged | +| `t3code/perf-projection-thread-message-hot-path` | Optimization 2 | `create-turn-spam-8x` | `thread.create dispatch -> thread.created` | `3.61ms / 58.38ms` | `2.38ms / 20.36ms` | large tail reduction; `ack -> event` fell from `56.95ms` to `19.22ms` | +| `t3code/perf-projection-thread-message-hot-path` | Optimization 2 | `create-turn-spam-8x` | `thread.archive dispatch -> thread.archived` | `5.71ms / 50.80ms` | `0.97ms / 16.53ms` | consistent improvement over both baseline and Optimization 1 | +| `t3code/perf-projection-thread-message-hot-path` | Optimization 2 | `mixed-stream-terminal-git` | `thread.create dispatch -> thread.created` | `0.89ms / 1.57ms` | `1.21ms / 4.50ms` | slight noise increase, still effectively realtime | +| `t3code/perf-projection-thread-message-hot-path` | Optimization 2 | `mixed-stream-terminal-git` | `git.status` | `150.08ms / 169.73ms` | `141.05ms / 162.86ms` | modest improvement, likely secondary to lower orchestration pressure | +| `t3code/perf-control-plane-priority-lane` | Optimization 3 | `create-turn-spam-8x` | `thread.create dispatch -> thread.created` | `3.61ms / 58.38ms` | `1.90ms / 26.57ms` | queue prioritization mainly removed `ack -> event` delay (`0.11ms / 25.45ms`) | +| `t3code/perf-control-plane-priority-lane` | Optimization 3 | `create-turn-spam-8x` | `thread.archive dispatch -> thread.archived` | `5.71ms / 50.80ms` | `1.09ms / 12.60ms` | best archive tail so far; user-visible control commands now preempt stream traffic | +| `t3code/perf-control-plane-priority-lane` | Optimization 3 | `mixed-stream-terminal-git` | `thread.create dispatch -> thread.created` | `0.89ms / 1.57ms` | `1.16ms / 1.33ms` | realistic mixed-load control-plane latency stays near-baseline | +| `t3code/perf-control-plane-priority-lane` | Optimization 3 | `mixed-stream-terminal-git` | `git.status` | `150.08ms / 169.73ms` | `156.29ms / 182.32ms` | no git improvement; likely unrelated measurement noise | + +## Notes + +- The initial perf suite is characterization-first. It should fail only on obvious hangs/timeouts while we collect stable p50/p95 numbers for this machine and repository. +- Each optimization will land on its own branch and append a new row here with the measured delta and artifact path. +- Current takeaway: the redline server-side repro is real. The biggest measured wins came from shrinking per-chunk projection cost and then prioritizing control-plane commands over internal stream traffic. The remaining work, if we keep pushing, is in worker topology and the provider-side queue hops rather than the websocket boundary itself.