Skip to content

Commit 89ab61c

Browse files
committed
refactor(queue): streamline job options and improve queue management
- Remove deprecated job option settings for consistency - Introduce default job options in queue creation function - Add validation to ensure valid queue names are provided
1 parent 9138011 commit 89ab61c

File tree

5 files changed

+47
-46
lines changed

5 files changed

+47
-46
lines changed

services/workflows-service/src/alert/alert-queue.service.ts

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -29,15 +29,6 @@ export class AlertQueueService implements OnModuleInit {
2929

3030
private async setupAlertQueue() {
3131
try {
32-
this.queueService.createQueue<AlertCheckJobData>(this.QUEUE_NAME, {
33-
name: this.QUEUE_NAME,
34-
jobOptions: {
35-
attempts: 3,
36-
backoff: { type: 'exponential', delay: 10000 },
37-
removeOnComplete: { count: 100, age: 3600 * 24 },
38-
removeOnFail: false,
39-
},
40-
});
4132
await this.queueService.setupJobScheduler(
4233
this.QUEUE_NAME,
4334
this.SCHEDULER_ID,
@@ -46,6 +37,12 @@ export class AlertQueueService implements OnModuleInit {
4637
name: 'alert-check',
4738
data: { timestamp: Date.now() },
4839
},
40+
{
41+
jobOptions: {
42+
attempts: 2,
43+
backoff: { type: 'exponential', delay: 10000 },
44+
},
45+
},
4946
);
5047

5148
this.registerWorker();

services/workflows-service/src/common/queue/queue.service.ts

Lines changed: 27 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,13 @@ import { BullMQPrometheusService } from '@/common/monitoring/bullmq-prometheus.s
88
import type { BullBoardInjectedInstance, IQueueService, QueueOptions } from './types';
99
import { QueueBullboardService } from './queue-bullboard.service';
1010

11+
const defaultJobOptions = {
12+
attempts: 3,
13+
backoff: { type: 'exponential', delay: 2000 },
14+
removeOnComplete: { count: 100, age: 3600 * 24 * 7 },
15+
removeOnFail: false,
16+
};
17+
1118
@Injectable()
1219
export class BullMQQueueService implements OnModuleDestroy, IQueueService {
1320
private redisClient: IORedis | null;
@@ -51,6 +58,12 @@ export class BullMQQueueService implements OnModuleDestroy, IQueueService {
5158
return this.shouldProcessJobs;
5259
}
5360

61+
private validateQueueName(queueName: string): void {
62+
if (!queueName || typeof queueName !== 'string' || queueName.trim().length === 0) {
63+
throw new Error('Queue name must be a non-empty string');
64+
}
65+
}
66+
5467
public getQueue(queueName: string): Queue {
5568
if (!this.redisClient) {
5669
throw new Error('Redis client not initialized');
@@ -63,11 +76,13 @@ export class BullMQQueueService implements OnModuleDestroy, IQueueService {
6376
throw new Error(`Queue with name '${queueName}' does not exist. Please create it first.`);
6477
}
6578

66-
registerWorker<T = any>(
79+
registerWorker(
6780
queueName: string,
6881
processor: (job: any) => Promise<any>,
6982
options: { concurrency?: number } = {},
7083
): void {
84+
this.validateQueueName(queueName);
85+
7186
if (!this.redisClient) {
7287
throw new Error('Redis client not initialized');
7388
}
@@ -113,18 +128,16 @@ export class BullMQQueueService implements OnModuleDestroy, IQueueService {
113128
}
114129

115130
createQueue(queueName: string, options?: QueueOptions): void {
131+
this.validateQueueName(queueName);
132+
116133
if (this.queues.has(queueName)) {
117134
return;
118135
}
119136

137+
const mergedJobOptions = { ...defaultJobOptions, ...(options?.jobOptions || {}) };
120138
const queue = new Queue(queueName, {
121139
connection: this.redisClient as IORedis,
122-
defaultJobOptions: options?.jobOptions ?? {
123-
attempts: 3,
124-
backoff: { type: 'exponential', delay: 5000 },
125-
removeOnComplete: { count: 100, age: 3600 * 24 },
126-
removeOnFail: false,
127-
},
140+
defaultJobOptions: mergedJobOptions,
128141
});
129142
this.queues.set(queueName, queue);
130143
this.logger.log(`Queue created: ${queueName}`);
@@ -157,25 +170,24 @@ export class BullMQQueueService implements OnModuleDestroy, IQueueService {
157170
jobOpts: {
158171
name: string;
159172
data: T;
160-
opts?: any;
161173
},
174+
queueOptions?: QueueOptions,
162175
): Promise<any> {
176+
this.validateQueueName(queueName);
163177
try {
178+
if (!this.queues.has(queueName)) {
179+
this.createQueue(queueName, queueOptions);
180+
}
181+
164182
const queue = this.getQueue(queueName);
183+
165184
const jobName = jobOpts.name;
166185
const firstJob = await queue.upsertJobScheduler(
167186
schedulerId,
168187
{ every: scheduleOpts.every, jobId: schedulerId },
169188
{
170189
name: jobName,
171190
data: jobOpts.data || { timestamp: Date.now() },
172-
opts: {
173-
attempts: jobOpts.opts?.attempts || 3,
174-
backoff: jobOpts.opts?.backoff || {
175-
type: 'exponential',
176-
delay: 3000,
177-
},
178-
},
179191
},
180192
);
181193
this.logger.log(`Created job scheduler: ${schedulerId}`, {

services/workflows-service/src/common/queue/types.ts

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,29 +11,27 @@ export interface BullBoardInjectedInstance {
1111
export interface IQueueService {
1212
addJob<T = any>(queueName: string, job_name: string, data: T, opts?: any): Promise<any>;
1313

14-
registerWorker<T = any>(
14+
registerWorker(
1515
queueName: string,
1616
processor: (job: any) => Promise<any>,
1717
options?: { concurrency?: number },
1818
): void;
1919

2020
isWorkerEnabled(): boolean;
21-
createQueue<T = any>(queueName: string, options?: QueueOptions<T>): void;
21+
createQueue(queueName: string, options?: QueueOptions): void;
2222
setupJobScheduler<T = any>(
2323
queueName: string,
2424
schedulerId: string,
2525
scheduleOpts: { every: number },
2626
jobOpts: {
2727
name: string;
2828
data: T;
29-
opts?: any;
3029
},
30+
queueOptions?: QueueOptions,
3131
): Promise<any>;
3232
}
3333

34-
export interface QueueOptions<T = any> {
35-
name: string;
36-
concurrency?: number;
34+
export interface QueueOptions {
3735
jobOptions?: {
3836
attempts?: number;
3937
backoff?: {

services/workflows-service/src/common/redis/redis.service.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,15 @@ export const REDIS_CLIENT = Symbol('REDIS_CLIENT');
77

88
@Injectable()
99
export class RedisService implements OnModuleDestroy {
10-
public readonly client: IORedis;
10+
public readonly client!: IORedis;
1111

1212
constructor(private readonly logger: AppLoggerService) {
1313
if (!env.QUEUE_SYSTEM_ENABLED) {
14-
this.client = null as any;
14+
Object.defineProperty(this, 'client', {
15+
get: () => {
16+
throw new Error('Redis client is not available when QUEUE_SYSTEM_ENABLED is false');
17+
},
18+
});
1519

1620
return;
1721
}

services/workflows-service/src/webhooks/webhooks.service.ts

Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -56,15 +56,7 @@ export class WebhooksService implements OnModuleInit {
5656

5757
private async setupQueueSystem() {
5858
try {
59-
this.queueService.createQueue<OutgoingWebhookJobData>(this.QUEUE_NAME, {
60-
name: this.QUEUE_NAME,
61-
jobOptions: {
62-
attempts: 3,
63-
backoff: { type: 'exponential', delay: 5000 },
64-
removeOnComplete: { count: 1000, age: 3600 * 24 * 7 },
65-
removeOnFail: false,
66-
},
67-
});
59+
this.queueService.createQueue(this.QUEUE_NAME);
6860

6961
this.registerWorker();
7062

@@ -77,11 +69,9 @@ export class WebhooksService implements OnModuleInit {
7769
}
7870

7971
private registerWorker() {
80-
this.queueService.registerWorker<OutgoingWebhookJobData>(
81-
this.QUEUE_NAME,
82-
this.processWebhookJob.bind(this),
83-
{ concurrency: 10 },
84-
);
72+
this.queueService.registerWorker(this.QUEUE_NAME, this.processWebhookJob.bind(this), {
73+
concurrency: 10,
74+
});
8575
}
8676

8777
private async processWebhookJob(job: Job<OutgoingWebhookJobData>) {

0 commit comments

Comments
 (0)