Skip to content

Commit 02db168

Browse files
committed
fix(fair-queue): prevent unbounded cooloff states growth
1 parent 7ccbbdb commit 02db168

File tree

3 files changed

+89
-4
lines changed

3 files changed

+89
-4
lines changed

packages/redis-worker/src/fair-queue/index.ts

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
8989
private cooloffEnabled: boolean;
9090
private cooloffThreshold: number;
9191
private cooloffPeriodMs: number;
92+
private maxCooloffStatesSize: number;
9293
private queueCooloffStates = new Map<string, QueueCooloffState>();
9394

9495
// Global rate limiter
@@ -142,6 +143,7 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
142143
this.cooloffEnabled = options.cooloff?.enabled ?? true;
143144
this.cooloffThreshold = options.cooloff?.threshold ?? 10;
144145
this.cooloffPeriodMs = options.cooloff?.periodMs ?? 10_000;
146+
this.maxCooloffStatesSize = options.cooloff?.maxStatesSize ?? 1000;
145147

146148
// Global rate limiter
147149
this.globalRateLimiter = options.globalRateLimiter;
@@ -878,8 +880,11 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
878880
}
879881
this.#resetCooloff(queueId);
880882
} else {
881-
this.batchedSpanManager.incrementStat(loopId, "claim_failures");
882-
this.#incrementCooloff(queueId);
883+
// Don't increment cooloff here - the queue was either:
884+
// 1. Empty (removed from master, cache cleaned up)
885+
// 2. Concurrency blocked (message released back to queue)
886+
// Neither case warrants cooloff as they're not failures
887+
this.batchedSpanManager.incrementStat(loopId, "claim_skipped");
883888
}
884889
}
885890
}
@@ -1214,8 +1219,11 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
12141219
this.#resetCooloff(queueId);
12151220
slotsUsed++;
12161221
} else {
1217-
this.batchedSpanManager.incrementStat(loopId, "process_failures");
1218-
this.#incrementCooloff(queueId);
1222+
// Don't increment cooloff here - the queue was either:
1223+
// 1. Empty (removed from master, cache cleaned up)
1224+
// 2. Concurrency blocked (message released back to queue)
1225+
// Neither case warrants cooloff as they're not failures
1226+
this.batchedSpanManager.incrementStat(loopId, "process_skipped");
12191227
break; // Queue empty or blocked, try next queue
12201228
}
12211229
}
@@ -1717,6 +1725,15 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
17171725
}
17181726

17191727
#incrementCooloff(queueId: string): void {
1728+
// Safety check: if the cache is too large, just clear it
1729+
if (this.queueCooloffStates.size >= this.maxCooloffStatesSize) {
1730+
this.logger.warn("Cooloff states cache hit size cap, clearing all entries", {
1731+
size: this.queueCooloffStates.size,
1732+
cap: this.maxCooloffStatesSize,
1733+
});
1734+
this.queueCooloffStates.clear();
1735+
}
1736+
17201737
const state = this.queueCooloffStates.get(queueId) ?? {
17211738
tag: "normal" as const,
17221739
consecutiveFailures: 0,

packages/redis-worker/src/fair-queue/tests/fairQueue.test.ts

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -728,6 +728,72 @@ describe("FairQueue", () => {
728728
await queue.close();
729729
}
730730
);
731+
732+
redisTest(
733+
"should clear cooloff states when size cap is exceeded",
734+
{ timeout: 15000 },
735+
async ({ redisOptions }) => {
736+
keys = new DefaultFairQueueKeyProducer({ prefix: "test" });
737+
738+
const scheduler = new DRRScheduler({
739+
redis: redisOptions,
740+
keys,
741+
quantum: 10,
742+
maxDeficit: 100,
743+
});
744+
745+
const queue = new FairQueue({
746+
redis: redisOptions,
747+
keys,
748+
scheduler,
749+
payloadSchema: TestPayloadSchema,
750+
shardCount: 1,
751+
consumerCount: 1,
752+
consumerIntervalMs: 20,
753+
visibilityTimeoutMs: 5000,
754+
cooloff: {
755+
enabled: true,
756+
threshold: 1, // Enter cooloff after 1 failure
757+
periodMs: 100, // Short cooloff for testing
758+
maxStatesSize: 5, // Very small cap for testing
759+
},
760+
startConsumers: false,
761+
});
762+
763+
// Enqueue messages to multiple queues
764+
for (let i = 0; i < 10; i++) {
765+
await queue.enqueue({
766+
queueId: `tenant:t${i}:queue:q1`,
767+
tenantId: `t${i}`,
768+
payload: { value: `msg-${i}` },
769+
});
770+
}
771+
772+
const processed: string[] = [];
773+
774+
// Handler that always fails to trigger cooloff
775+
queue.onMessage(async (ctx) => {
776+
processed.push(ctx.message.payload.value);
777+
await ctx.fail(new Error("Forced failure"));
778+
});
779+
780+
queue.start();
781+
782+
// Wait for some messages to be processed and fail
783+
await vi.waitFor(
784+
() => {
785+
expect(processed.length).toBeGreaterThanOrEqual(5);
786+
},
787+
{ timeout: 10000 }
788+
);
789+
790+
// The cooloff states size should be capped (test that it doesn't grow unbounded)
791+
const cacheSizes = queue.getCacheSizes();
792+
expect(cacheSizes.cooloffStatesSize).toBeLessThanOrEqual(10); // Some buffer for race conditions
793+
794+
await queue.close();
795+
}
796+
);
731797
});
732798

733799
describe("inspection methods", () => {

packages/redis-worker/src/fair-queue/types.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -309,6 +309,8 @@ export interface CooloffOptions {
309309
threshold?: number;
310310
/** Duration of cooloff period in milliseconds (default: 10000) */
311311
periodMs?: number;
312+
/** Maximum number of cooloff state entries before triggering cleanup (default: 1000) */
313+
maxStatesSize?: number;
312314
}
313315

314316
/**

0 commit comments

Comments
 (0)