Skip to content

Commit d997211

Browse files
committed
fix(batch-queue): Batch items that hit the environment queue size limit now fast-fail
1 parent bf736a7 commit d997211

File tree

8 files changed

+483
-19
lines changed

8 files changed

+483
-19
lines changed
Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
---
2+
area: webapp
3+
type: fix
4+
---
5+
6+
Batch items that hit the environment queue size limit now fast-fail without
7+
retries and without creating pre-failed TaskRuns.
8+
9+
## Why
10+
11+
When a customer fills their environment's queue (default 2.5M) and keeps
12+
pushing batch triggers, every batch item was hitting `ServiceValidationError`
13+
from `validateQueueLimits`, looping through 6 exponential-backoff retries
14+
(~63s per item), and then creating a pre-failed `TaskRun` for each item on the
15+
final attempt — bringing its attempt, trace events, and a `BatchTaskRunError`
16+
row along for the ride.
17+
18+
At customer-overload scale (one tenant pushing ~1 batch/s × ~10 items each
19+
against a paused/full queue) this:
20+
21+
1. filled `redis_alt` with hundreds of thousands of never-completing
22+
`engine:batch:*` keys, because items kept bouncing between the FairQueue
23+
and the in-flight hash,
24+
2. pinned the batch worker on doomed retries instead of processing healthy
25+
batches, and
26+
3. created enormous volumes of pre-failed `TaskRun` / `BatchTaskRunError`
27+
rows that serve no customer purpose (the items were never going to
28+
trigger — the customer just needs to fix their queue).
29+
30+
## What changed
31+
32+
- New `QueueSizeLimitExceededError` subclass of `ServiceValidationError`
33+
thrown by `runEngine/services/triggerTask.server.ts` instead of the generic
34+
validation error, so callers can detect this specific overload condition.
35+
- `ProcessBatchItemCallback` result gains an optional `skipRetries?: boolean`
36+
flag. When true, the `BatchQueue` records the failure immediately regardless
37+
of attempt number, bypassing the FairQueue retry ladder.
38+
- The batch process-item callback in `runEngineHandlers.server.ts` detects
39+
`QueueSizeLimitExceededError` and returns
40+
`{ success: false, errorCode: "QUEUE_SIZE_LIMIT_EXCEEDED", skipRetries: true }`
41+
**without** calling `triggerFailedTaskService` — no pre-failed TaskRun is
42+
created for these items.
43+
- The batch completion callback collapses per-item `BatchTaskRunError` writes
44+
into a single aggregate row when every failure shares the same
45+
`QUEUE_SIZE_LIMIT_EXCEEDED` error code, bounding DB writes to O(batches)
46+
instead of O(items) during overload events.
47+
48+
Other error types (transient trigger failures, environment not found, etc.)
49+
retain the existing retry + pre-failed-run behavior.
50+
51+
## Test plan
52+
53+
### Unit tests
54+
55+
New `skipRetries on failed items` suite in
56+
`internal-packages/run-engine/src/batch-queue/tests/index.test.ts`:
57+
58+
```bash
59+
cd internal-packages/run-engine
60+
pnpm run test ./src/batch-queue/tests/index.test.ts --run
61+
```
62+
63+
Covers:
64+
65+
- `skipRetries: true` from the callback → item called exactly once, not
66+
`maxAttempts` times.
67+
- Regression guard: when `skipRetries` is not set, the retry ladder still
68+
fires (items called `maxAttempts` times).
69+
- Per-item mixing within one batch: even-indexed items fast-fail, odd-indexed
70+
items exhaust the retry ladder — all correctly tracked.
71+
72+
### Manual e2e (local, against `references/hello-world`)
73+
74+
Done before merge — reproduces the Centralize-style overload against the
75+
local dev stack.
76+
77+
Setup:
78+
79+
1. Add `MAXIMUM_DEPLOYED_QUEUE_SIZE=2` to the webapp's `.env.local` (or
80+
whatever file your local webapp reads — this caps the deployed queue at
81+
just 2 items).
82+
2. `pnpm run dev --filter webapp`
83+
3. `cd references/hello-world && pnpm exec trigger dev`
84+
4. Temporarily add a blocking task to `references/hello-world/src/trigger/`:
85+
86+
```ts
87+
export const sleepyTask = task({
88+
id: "sleepy-task",
89+
run: async () => {
90+
await new Promise((r) => setTimeout(r, 10 * 60 * 1000));
91+
},
92+
});
93+
```
94+
95+
5. Trigger `sleepy-task` twice individually via the dashboard or MCP so the
96+
queue is holding 2 items and hits the cap.
97+
98+
Exercise the fix:
99+
100+
6. Trigger a batch with 5 items of `sleepy-task` via
101+
`mcp__trigger__trigger_task` (or the batch API directly).
102+
103+
Expected observations (all must be true):
104+
105+
- [ ] Dashboard: the new batch transitions to `ABORTED` within a second or
106+
two — it does **not** sit in `PROCESSING` for a minute+.
107+
- [ ] DB: `BatchTaskRun` row has
108+
`status='ABORTED'`, `failedRunCount=5`, `successfulRunCount=0`.
109+
- [ ] DB: **exactly one** `BatchTaskRunError` row for the batch
110+
(`SELECT COUNT(*) FROM "BatchTaskRunError" WHERE "batchTaskRunId"=…`),
111+
with the error text mentioning `"5 items in this batch failed with
112+
the same error"` and `errorCode='QUEUE_SIZE_LIMIT_EXCEEDED'`.
113+
- [ ] DB: **no new `TaskRun` rows** were created for the batch items
114+
(compare `SELECT COUNT(*) FROM "TaskRun" WHERE "batchId"=…` before
115+
and after — should stay 0).
116+
- [ ] Webapp logs: one
117+
`"[BatchQueue] Batch item rejected: queue size limit reached"` per
118+
item at `warn` level, **no**
119+
`"[BatchQueue] Failed to trigger batch item"` error lines, **no**
120+
`"TriggerFailedTaskService"` log lines for the batch items.
121+
- [ ] Redis (via `redis-cli` against the local redis instance backing the
122+
batch queue): `engine:batch:<batchId>:*` keys are gone after the batch
123+
finalizes, and so are the
124+
`engine:batch:queue:env:<envId>:batch:<batchId>*` keys.
125+
126+
Clean up:
127+
128+
7. Cancel the two `sleepy-task` runs to unblock the queue.
129+
8. Remove the temporary `sleepy-task` file and the
130+
`MAXIMUM_DEPLOYED_QUEUE_SIZE` override.

apps/webapp/app/runEngine/services/triggerTask.server.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ import type {
4141
TriggerTaskRequest,
4242
TriggerTaskValidator,
4343
} from "../types";
44-
import { ServiceValidationError } from "~/v3/services/common.server";
44+
import { QueueSizeLimitExceededError, ServiceValidationError } from "~/v3/services/common.server";
4545

4646
class NoopTriggerRacepointSystem implements TriggerRacepointSystem {
4747
async waitForRacepoint(options: { racepoint: TriggerRacepoints; id: string }): Promise<void> {
@@ -271,8 +271,9 @@ export class RunEngineTriggerTaskService {
271271
);
272272

273273
if (!queueSizeGuard.ok) {
274-
throw new ServiceValidationError(
274+
throw new QueueSizeLimitExceededError(
275275
`Cannot trigger ${taskId} as the queue size limit for this environment has been reached. The maximum size is ${queueSizeGuard.maximumSize}`,
276+
queueSizeGuard.maximumSize ?? 0,
276277
undefined,
277278
"warn"
278279
);
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
import { generateJWT as internal_generateJWT } from "@trigger.dev/core/v3";
2+
import { extractJwtSigningSecretKey } from "./jwtAuth.server";
3+
4+
type Environment = Parameters<typeof extractJwtSigningSecretKey>[0];
5+
6+
export type MintRunTokenOptions = {
7+
/** Include the input-stream write scope (needed for steering messages from the playground). */
8+
includeInputStreamWrite?: boolean;
9+
/** Token expiration. Defaults to "1h". */
10+
expirationTime?: string;
11+
};
12+
13+
/**
14+
* Mint a run-scoped public access token (JWT) for browser subscription to a
15+
* run's realtime streams.
16+
*
17+
* Used by:
18+
* - The playground action to give a freshly triggered chat session a token.
19+
* - The run details page to let the agent view subscribe to the chat stream
20+
* of an existing run (read-only).
21+
*/
22+
export async function mintRunToken(
23+
environment: Environment,
24+
runFriendlyId: string,
25+
options: MintRunTokenOptions = {}
26+
): Promise<string> {
27+
const scopes = [`read:runs:${runFriendlyId}`];
28+
if (options.includeInputStreamWrite) {
29+
scopes.push(`write:inputStreams:${runFriendlyId}`);
30+
}
31+
32+
return internal_generateJWT({
33+
secretKey: extractJwtSigningSecretKey(environment),
34+
payload: {
35+
sub: environment.id,
36+
pub: true,
37+
scopes,
38+
},
39+
expirationTime: options.expirationTime ?? "1h",
40+
});
41+
}

apps/webapp/app/v3/runEngineHandlers.server.ts

Lines changed: 85 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import { logger } from "~/services/logger.server";
1313
import { updateMetadataService } from "~/services/metadata/updateMetadataInstance.server";
1414
import { reportInvocationUsage } from "~/services/platform.v3.server";
1515
import { MetadataTooLargeError } from "~/utils/packets";
16+
import { QueueSizeLimitExceededError } from "~/v3/services/common.server";
1617
import { TriggerTaskService } from "~/v3/services/triggerTask.server";
1718
import { tracer } from "~/v3/tracer.server";
1819
import { createExceptionPropertiesFromError } from "./eventRepository/common.server";
@@ -637,6 +638,15 @@ export function registerRunEngineEventBusHandlers() {
637638
});
638639
}
639640

641+
/**
642+
* errorCode returned by the batch process-item callback when the trigger was
643+
* rejected because the environment's queue is at its maximum size. The
644+
* BatchQueue (via `skipRetries`) short-circuits retries for this code, and the
645+
* batch completion callback collapses per-item errors into a single aggregate
646+
* `BatchTaskRunError` row instead of writing one per item.
647+
*/
648+
const QUEUE_SIZE_LIMIT_EXCEEDED_ERROR_CODE = "QUEUE_SIZE_LIMIT_EXCEEDED";
649+
640650
/**
641651
* Set up the BatchQueue processing callbacks.
642652
* These handle creating runs from batch items and completing batches.
@@ -808,6 +818,37 @@ export function setupBatchQueueCallbacks() {
808818
} catch (error) {
809819
const errorMessage = error instanceof Error ? error.message : String(error);
810820

821+
// Queue-size-limit rejections are a customer-overload scenario (the
822+
// env's queue is at its configured max). Retrying is pointless — the
823+
// same item will fail again — and creating pre-failed TaskRuns for
824+
// every item of every retried batch is exactly what chews through
825+
// DB capacity when a noisy tenant fills their queue. Signal the
826+
// BatchQueue to skip retries and skip pre-failed run creation, and
827+
// let the completion callback collapse the per-item errors into a
828+
// single summary row.
829+
if (error instanceof QueueSizeLimitExceededError) {
830+
logger.warn("[BatchQueue] Batch item rejected: queue size limit reached", {
831+
batchId,
832+
friendlyId,
833+
itemIndex,
834+
task: item.task,
835+
environmentId: meta.environmentId,
836+
maximumSize: error.maximumSize,
837+
});
838+
839+
span.setAttribute("batch.result.error", errorMessage);
840+
span.setAttribute("batch.result.errorCode", QUEUE_SIZE_LIMIT_EXCEEDED_ERROR_CODE);
841+
span.setAttribute("batch.result.skipRetries", true);
842+
span.end();
843+
844+
return {
845+
success: false as const,
846+
error: errorMessage,
847+
errorCode: QUEUE_SIZE_LIMIT_EXCEEDED_ERROR_CODE,
848+
skipRetries: true,
849+
};
850+
}
851+
811852
logger.error("[BatchQueue] Failed to trigger batch item", {
812853
batchId,
813854
friendlyId,
@@ -889,20 +930,51 @@ export function setupBatchQueueCallbacks() {
889930
},
890931
});
891932

892-
// Create error records if there were failures
933+
// Create error records if there were failures.
934+
//
935+
// Fast-path for queue-size-limit overload: when every failure is the
936+
// same QUEUE_SIZE_LIMIT_EXCEEDED error, collapse them into a single
937+
// aggregate row instead of writing one per item. This keeps the DB
938+
// write volume bounded to O(batches) instead of O(items) when a noisy
939+
// tenant fills their queue and all of their batches start bouncing.
893940
if (failures.length > 0) {
894-
await tx.batchTaskRunError.createMany({
895-
data: failures.map((failure) => ({
896-
batchTaskRunId: batchId,
897-
index: failure.index,
898-
taskIdentifier: failure.taskIdentifier,
899-
payload: failure.payload,
900-
options: failure.options as Prisma.InputJsonValue | undefined,
901-
error: failure.error,
902-
errorCode: failure.errorCode,
903-
})),
904-
skipDuplicates: true,
905-
});
941+
const allQueueSizeLimit = failures.every(
942+
(f) => f.errorCode === QUEUE_SIZE_LIMIT_EXCEEDED_ERROR_CODE
943+
);
944+
945+
if (allQueueSizeLimit) {
946+
const sample = failures[0]!;
947+
await tx.batchTaskRunError.createMany({
948+
data: [
949+
{
950+
batchTaskRunId: batchId,
951+
// Use the first item's index as a stable anchor for the
952+
// (batchTaskRunId, index) unique constraint so callback
953+
// retries remain idempotent.
954+
index: sample.index,
955+
taskIdentifier: sample.taskIdentifier,
956+
payload: sample.payload,
957+
options: sample.options as Prisma.InputJsonValue | undefined,
958+
error: `${sample.error} (${failures.length} items in this batch failed with the same error)`,
959+
errorCode: sample.errorCode,
960+
},
961+
],
962+
skipDuplicates: true,
963+
});
964+
} else {
965+
await tx.batchTaskRunError.createMany({
966+
data: failures.map((failure) => ({
967+
batchTaskRunId: batchId,
968+
index: failure.index,
969+
taskIdentifier: failure.taskIdentifier,
970+
payload: failure.payload,
971+
options: failure.options as Prisma.InputJsonValue | undefined,
972+
error: failure.error,
973+
errorCode: failure.errorCode,
974+
})),
975+
skipDuplicates: true,
976+
});
977+
}
906978
}
907979
});
908980

apps/webapp/app/v3/services/common.server.ts

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,3 +10,22 @@ export class ServiceValidationError extends Error {
1010
this.name = "ServiceValidationError";
1111
}
1212
}
13+
14+
/**
15+
* Thrown when a trigger is rejected because the environment's queue is at its
16+
* maximum size. This is identified separately from other validation errors so
17+
* the batch queue worker can short-circuit retries and skip pre-failed run
18+
* creation for this specific overload scenario — see the batch process item
19+
* callback in `runEngineHandlers.server.ts`.
20+
*/
21+
export class QueueSizeLimitExceededError extends ServiceValidationError {
22+
constructor(
23+
message: string,
24+
public maximumSize: number,
25+
status?: number,
26+
logLevel?: ServiceValidationErrorLevel
27+
) {
28+
super(message, status, logLevel);
29+
this.name = "QueueSizeLimitExceededError";
30+
}
31+
}

internal-packages/run-engine/src/batch-queue/index.ts

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -865,8 +865,16 @@ export class BatchQueue {
865865
span?.setAttribute("batch.errorCode", result.errorCode);
866866
}
867867

868-
// If retries are available, use FairQueue retry scheduling
869-
if (!isFinalAttempt) {
868+
const skipRetries = result.skipRetries === true;
869+
if (skipRetries) {
870+
span?.setAttribute("batch.skipRetries", true);
871+
}
872+
873+
// If retries are available AND the callback didn't opt out, use
874+
// FairQueue retry scheduling. `skipRetries` short-circuits this
875+
// regardless of attempt number so the batch can finalize quickly
876+
// when the error is known to be non-recoverable on retry.
877+
if (!isFinalAttempt && !skipRetries) {
870878
span?.setAttribute("batch.retry", true);
871879
span?.setAttribute("batch.attempt", attempt);
872880

@@ -890,7 +898,7 @@ export class BatchQueue {
890898
return;
891899
}
892900

893-
// Final attempt exhausted - record permanent failure
901+
// Final attempt exhausted (or retries skipped) - record permanent failure
894902
const payloadStr = await this.#startSpan(
895903
"BatchQueue.serializePayload",
896904
async (innerSpan) => {

0 commit comments

Comments
 (0)