-
Notifications
You must be signed in to change notification settings - Fork 198
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feature/bull_mq_integration #2735
base: dev
Are you sure you want to change the base?
Changes from 15 commits
4632b91
faaacf5
44343e8
8e1ff70
00ec074
146d628
5b83c6e
606a006
63e6172
e7d416b
ab614af
dbf0374
a26bb97
216bd60
ef46137
b20a326
3193904
db1999f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change | ||||||
---|---|---|---|---|---|---|---|---|
|
@@ -35,3 +35,7 @@ WEB_UI_SDK_URL=http://localhost:5202 | |||||||
#HASHING_KEY_SECRET="$2b$10$FovZTB91/QQ4Yu28nvL8e." | ||||||||
HASHING_KEY_SECRET_BASE64=JDJiJDEwJDNFeWtwWEs4QkdiczlRaWFwLkM4Vk8= | ||||||||
NOTION_API_KEY=secret | ||||||||
REDIS_HOST=localhost | ||||||||
REDIS_PORT=7381 | ||||||||
REDIS_PASSWORD=password | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Security concern: Weak Redis password. The
Replace the current line with: -REDIS_PASSWORD=password
+REDIS_PASSWORD=<strong-unique-password> Also, add a comment above this line:
📝 Committable suggestion
Suggested change
|
||||||||
IS_WORKER_SERVICE=false |
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why did you create a new docker-compose file and didn't use the existing one? |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
version: '3.8' | ||
services: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why is it located on a new docker image? Our service has to use redis in order to run ... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why cant we use the same docker-compose-db.yml ? we can use same like this
Bring only redis using : This Line
|
||
redis: | ||
image: redis:alpine | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it for local only? Because the Alpine image is less performant than the Debian image There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes it's local only |
||
ports: | ||
- '${REDIS_PORT}:6379' | ||
volumes: | ||
- redis-data:/data | ||
command: > | ||
--requirepass ${REDIS_PASSWORD} | ||
--appendonly yes | ||
environment: | ||
- REDIS_PASSWORD=${REDIS_PASSWORD} | ||
- REDIS_PORT=${REDIS_PORT} | ||
networks: | ||
- app-network | ||
volumes: | ||
redis-data: | ||
driver: local | ||
networks: | ||
app-network: | ||
driver: bridge |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,13 +5,15 @@ | |
"description": "workflow-service", | ||
"scripts": { | ||
"spellcheck": "cspell \"*\"", | ||
"setup": "npm run docker:db:down && npm run docker:db && wait-on tcp:5432 && npm run db:reset && npm run seed", | ||
"setup": "npm run docker:db:down && npm run docker:db && wait-on tcp:5432 && npm run docker:redis:down && npm run docker:redis && npm run db:reset && npm run seed", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this is better
|
||
"format": "prettier --write . '!**/*.{md,hbs}'", | ||
"format:check": "prettier --check . '!**/*.{md,hbs}'", | ||
"lint": "eslint . --fix", | ||
"start": "nest start", | ||
"dev": "npm run start:watch", | ||
"worker": "nest start --entryFile ./worker-main", | ||
"prod": "npm run db:migrate-up && node dist/src/main", | ||
"worker:prod": "npm run db:migrate-up && node dist/src/worker-main", | ||
"prod:next": "npm run db:migrate-up && npm run db:data-sync && node dist/src/main", | ||
"start:watch": "nest start --watch", | ||
"start:debug": "nest start --debug --watch", | ||
|
@@ -30,6 +32,8 @@ | |
"db:reset:dev:with-data": "npm run db:reset:dev && npm run db:data-migration:migrate && npm run db:data-sync", | ||
"db:init": "npm run db:migrate-dev -- --name 'initial version' && npm run db:migrate-up seed", | ||
"prisma:generate": "prisma generate", | ||
"docker:redis": "docker compose -f docker-compose.redis.yml up -d --wait", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If you run this command this way, the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. from what i checked it worked |
||
"docker:redis:down": "docker compose -f docker-compose.redis.yml down --volumes", | ||
"docker:db": "docker compose -f docker-compose.db.yml up -d --wait", | ||
"docker:db:down": "docker compose -f docker-compose.db.yml down --volumes", | ||
"docker:build": "docker build .", | ||
|
@@ -50,8 +54,12 @@ | |
"@ballerine/common": "0.9.43", | ||
"@ballerine/workflow-core": "0.6.55", | ||
"@ballerine/workflow-node-sdk": "0.6.55", | ||
"@bull-board/api": "^6.0.0", | ||
"@bull-board/express": "^6.0.0", | ||
"@bull-board/nestjs": "^6.0.0", | ||
"@faker-js/faker": "^7.6.0", | ||
"@nestjs/axios": "^2.0.0", | ||
"@nestjs/bullmq": "^10.2.1", | ||
"@nestjs/common": "^9.3.12", | ||
"@nestjs/config": "2.3.1", | ||
"@nestjs/core": "^9.3.12", | ||
|
@@ -78,6 +86,7 @@ | |
"ballerine-nestjs-typebox": "3.0.2-next.11", | ||
"base64-stream": "^1.0.0", | ||
"bcrypt": "5.1.0", | ||
"bullmq": "^5.13.2", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you should also include redis client There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. add bull-arena which is the UI component for bull |
||
"class-transformer": "0.5.1", | ||
"class-validator": "0.14.0", | ||
"concat-stream": "^2.0.0", | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,6 +24,8 @@ import { ProjectModule } from '@/project/project.module'; | |
import { UserRepository } from '@/user/user.repository'; | ||
import { AlertDefinitionModule } from '@/alert-definition/alert-definition.module'; | ||
import { SentryModule } from '@/sentry/sentry.module'; | ||
import { BullMqModule } from '@/bull-mq/bull-mq.module'; | ||
import { OutgoingWebhooksModule } from '@/webhooks/outgoing-webhooks/outgoing-webhooks.module'; | ||
|
||
@Module({ | ||
imports: [ | ||
|
@@ -32,6 +34,8 @@ import { SentryModule } from '@/sentry/sentry.module'; | |
PrismaModule, | ||
SentryModule, | ||
ProjectModule, | ||
BullMqModule, | ||
OutgoingWebhooksModule, | ||
HttpModule.register({ | ||
timeout: 5000, | ||
maxRedirects: 10, | ||
|
@@ -48,15 +52,14 @@ import { SentryModule } from '@/sentry/sentry.module'; | |
], | ||
controllers: [AlertControllerInternal, AlertControllerExternal], | ||
providers: [ | ||
{ | ||
provide: WebhookHttpService, | ||
useExisting: HttpService, | ||
}, | ||
WebhookHttpService, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Review module inclusion in providers array. The simplification of Modules are typically only included in the Consider removing providers: [
WebhookHttpService,
AlertService,
AlertRepository,
AlertDefinitionRepository,
WebhookManagerService,
WebhookEventEmitterService,
- BullMqModule,
- OutgoingWebhooksModule,
// TODO: Export to user module
UserService,
UserRepository,
PasswordService,
], Also applies to: 61-62 |
||
AlertService, | ||
AlertRepository, | ||
AlertDefinitionRepository, | ||
WebhookManagerService, | ||
WebhookEventEmitterService, | ||
BullMqModule, | ||
OutgoingWebhooksModule, | ||
// TODO: Export to user module | ||
UserService, | ||
UserRepository, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,27 +1,27 @@ | ||
import { alertWebhookFailure } from '@/events/alert-webhook-failure'; | ||
import { lastValueFrom } from 'rxjs'; | ||
import * as common from '@nestjs/common'; | ||
import { ConfigService } from '@nestjs/config'; | ||
import { ClsService } from 'nestjs-cls'; | ||
import { SentryInterceptor } from '@/sentry/sentry.interceptor'; | ||
import { AppLoggerService } from '@/common/app-logger/app-logger.service'; | ||
import { WebhookEventEmitterService } from './webhook-event-emitter.service'; | ||
import { IWebhookEntityEventData } from './types'; | ||
import { Webhook } from '@/events/get-webhooks'; | ||
import { HttpService } from '@nestjs/axios'; | ||
import { sign } from '@ballerine/common'; | ||
import { AnyRecord } from '@ballerine/common'; | ||
import { env } from '@/env'; | ||
import { OutgoingWebhookQueueService } from '@/bull-mq/outgoing-webhook/outgoing-webhook-queue.service'; | ||
import { OutgoingWebhooksService } from '@/webhooks/outgoing-webhooks/outgoing-webhooks.service'; | ||
|
||
@common.Injectable() | ||
export abstract class WebhookHttpService extends HttpService {} | ||
export class WebhookHttpService {} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unused Empty Class The class |
||
|
||
@common.Injectable() | ||
@common.UseInterceptors(SentryInterceptor) | ||
export class WebhookManagerService { | ||
constructor( | ||
private readonly cls: ClsService, | ||
protected readonly logger: AppLoggerService, | ||
protected readonly configService: ConfigService, | ||
protected readonly httpService: WebhookHttpService, | ||
protected readonly outgoingQueueWebhookService: OutgoingWebhookQueueService, | ||
protected readonly outgoingWebhookService: OutgoingWebhooksService, | ||
protected readonly webhookEventEmitter: WebhookEventEmitterService, | ||
) { | ||
webhookEventEmitter.on('*', async (eventData: any) => { | ||
|
@@ -45,21 +45,42 @@ export class WebhookManagerService { | |
webhook: Webhook; | ||
webhookSharedSecret: string; | ||
}) { | ||
try { | ||
const { id, url, environment, apiVersion } = webhook; | ||
const { id, url, environment, apiVersion } = webhook; | ||
|
||
if (env.QUEUE_SYSTEM_ENABLED) { | ||
return await this.outgoingQueueWebhookService.addJob({ | ||
requestConfig: { | ||
url, | ||
method: 'POST', | ||
headers: {}, | ||
body: data as unknown as AnyRecord, | ||
timeout: 15_000, | ||
}, | ||
customerConfig: { | ||
webhookSharedSecret, | ||
}, | ||
}); | ||
} | ||
|
||
try { | ||
this.logger.log('Sending webhook', { id, url }); | ||
|
||
const res = await lastValueFrom( | ||
this.httpService.post(url, data, { | ||
headers: { | ||
'X-HMAC-Signature': sign({ | ||
payload: data, | ||
key: webhookSharedSecret, | ||
}), | ||
}, | ||
}), | ||
); | ||
const response = await this.outgoingWebhookService.invokeWebhook({ | ||
requestConfig: { | ||
url, | ||
method: 'POST', | ||
headers: {}, | ||
body: data as unknown as AnyRecord, | ||
timeout: 15_000, | ||
}, | ||
customerConfig: { | ||
webhookSharedSecret, | ||
}, | ||
}); | ||
|
||
if (response.status < 200 || response.status >= 300) { | ||
throw new Error(`Webhook failed with status ${response.status} for ${url}`); | ||
} | ||
} catch (error: Error | any) { | ||
this.logger.error('Webhook error data', { | ||
data, | ||
|
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,69 @@ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
import { ConnectionOptions, Job, Queue, Worker } from 'bullmq'; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
import { Injectable, OnModuleDestroy } from '@nestjs/common'; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
import { REDIS_CONFIG } from '@/redis/const/redis-config'; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
import { env } from '@/env'; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
import { AppLoggerService } from '@/common/app-logger/app-logger.service'; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
import { QUEUES } from '@/bull-mq/consts'; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
@Injectable() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
export abstract class BaseQueueWorkerService<T = any> implements OnModuleDestroy { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
protected queue?: Queue; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
protected worker?: Worker; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
protected connectionOptions: ConnectionOptions; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
protected constructor(protected queueName: string, protected readonly logger: AppLoggerService) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
this.connectionOptions = { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
...REDIS_CONFIG, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
}; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if (!env.QUEUE_SYSTEM_ENABLED) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
this.queue = new Queue(queueName, { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
connection: this.connectionOptions, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
defaultJobOptions: { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
...Object.entries(QUEUES).find(([_, queueOptions]) => queueOptions.name === queueName)?.[1] | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
.config, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
}, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
}); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if (env.IS_WORKER_SERVICE !== true) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
this.initializeWorker(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Improve environment variable checks and add error handling.
Apply this diff to improve the constructor: protected constructor(protected queueName: string, protected readonly logger: AppLoggerService) {
this.connectionOptions = {
...REDIS_CONFIG,
};
- if (!env.QUEUE_SYSTEM_ENABLED) {
+ if (env.QUEUE_SYSTEM_ENABLED !== 'true') {
return;
}
- this.queue = new Queue(queueName, {
- connection: this.connectionOptions,
- defaultJobOptions: {
- ...Object.entries(QUEUES).find(([_, queueOptions]) => queueOptions.name === queueName)?.[1]
- .config,
- },
- });
+ try {
+ this.queue = new Queue(queueName, {
+ connection: this.connectionOptions,
+ defaultJobOptions: {
+ ...Object.entries(QUEUES).find(([_, queueOptions]) => queueOptions.name === queueName)?.[1]
+ ?.config,
+ },
+ });
+ } catch (error) {
+ this.logger.error(`Failed to initialize queue: ${error.message}`);
+ return;
+ }
- if (env.IS_WORKER_SERVICE !== true) {
+ if (env.IS_WORKER_SERVICE === 'true') {
this.initializeWorker();
}
} These changes ensure proper boolean comparisons for environment variables and add error handling for queue initialization. 📝 Committable suggestion
Suggested change
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
abstract handleJob(job: Job<T>): Promise<void>; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
async addJob(jobData: T, jobOptions = {}): Promise<void> { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
await this.queue?.add(this.queueName, jobData, jobOptions); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Improve error handling in The current implementation silently fails when the queue is undefined. Implement explicit error handling to make the behavior more predictable and easier to debug. Apply this diff to improve the async addJob(jobData: T, jobOptions = {}): Promise<void> {
- await this.queue?.add(this.queueName, jobData, jobOptions);
+ if (!this.queue) {
+ this.logger.warn('Attempted to add job while queue system is disabled');
+ throw new Error('Queue system is disabled. Cannot add job.');
+ }
+ try {
+ await this.queue.add(this.queueName, jobData, jobOptions);
+ } catch (error) {
+ this.logger.error(`Failed to add job to queue: ${error.message}`);
+ throw error;
+ }
} This change ensures that callers are aware when jobs cannot be added due to a disabled queue system and provides better error logging. 📝 Committable suggestion
Suggested change
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
protected initializeWorker() { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
this.worker = new Worker( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
this.queueName, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
async (job: Job<T>) => { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
await this.handleJob(job); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
}, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
{ connection: this.connectionOptions }, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
this.worker?.on('completed', (job: Job) => { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
this.logger.log(`Webhook job ${job.id} completed successfully`); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
}); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
this.worker?.on('failed', (job, error, prev) => { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
this.logger.error( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
`Webhook job ${job?.id} failed after in queue: ${this.queue?.name} retries: ${error.message}`, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
}); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
this.queue?.on('cleaned', (jobs, type) => { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
this.logger.log(`${jobs.length} ${type} jobs have been cleaned from the webhook queue`); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
}); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add queue system check in The event listeners for job completion, failure, and queue cleaning are well implemented. However, we should add a check to ensure the queue system is enabled before initializing the worker. Apply this diff to add the check: protected initializeWorker() {
+ if (!this.queue) {
+ this.logger.warn('Attempted to initialize worker while queue system is disabled');
+ return;
+ }
this.worker = new Worker(
// ... rest of the method
);
// ... event listeners
} This change prevents attempting to initialize a worker when the queue system is disabled. The event listener setup is well-implemented, providing good logging for job statuses and queue cleaning.
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
async onModuleDestroy() { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
await Promise.all([this.worker?.close(), this.queue?.close()]); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
import { Module } from '@nestjs/common'; | ||
import { BullModule } from '@nestjs/bullmq'; | ||
import { BullBoardModule } from '@bull-board/nestjs'; | ||
import { ExpressAdapter } from '@bull-board/express'; | ||
import { BullAdapter } from '@bull-board/api/bullAdapter'; | ||
import { AppLoggerModule } from '@/common/app-logger/app-logger.module'; | ||
import { QUEUES } from '@/bull-mq/consts'; | ||
import { OutgoingWebhookQueueService } from '@/bull-mq/outgoing-webhook/outgoing-webhook-queue.service'; | ||
import { REDIS_CONFIG } from '@/redis/const/redis-config'; | ||
import { OutgoingWebhooksModule } from '@/webhooks/outgoing-webhooks/outgoing-webhooks.module'; | ||
|
||
@Module({ | ||
imports: [ | ||
AppLoggerModule, | ||
OutgoingWebhooksModule, | ||
BullModule.forRootAsync({ | ||
useFactory: () => { | ||
return { | ||
connection: { | ||
...REDIS_CONFIG, | ||
}, | ||
}; | ||
}, | ||
}), | ||
BullModule.registerQueue( | ||
...Object.values(QUEUES).map(queue => ({ | ||
name: queue.name, | ||
...queue.config, | ||
})), | ||
), | ||
BullBoardModule.forRoot({ | ||
route: '/queues', | ||
adapter: ExpressAdapter, | ||
}), | ||
...Object.values(QUEUES).map(queue => | ||
BullBoardModule.forFeature({ | ||
name: queue.name, | ||
adapter: BullAdapter, | ||
}), | ||
), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 💡 Codebase verification '/queues' route is not secured. The '/queues' route in
🔗 Analysis chainBullBoardModule configuration is correct, but consider security implications. The BullBoardModule is properly configured for monitoring queues, using the correct adapters and dynamically adding all queues from the QUEUES constant. However, the '/queues' route might need to be secured in a production environment. Let's verify if there are any security measures in place: If no security measures are found, consider adding authentication middleware to protect the '/queues' route. 🏁 Scripts executedThe following scripts were executed for the analysis: Script: #!/bin/bash
# Description: Check for security measures on the '/queues' route
# Test: Search for middleware or guards applied to the '/queues' route
rg --type typescript -A 5 "route: '/queues'"
# Test: Search for any global guards or middleware in the main application file
rg --type typescript -A 10 'const app = await NestFactory.create'
Length of output: 190 Script: #!/bin/bash
# Description: Check for security measures on the '/queues' route
# Test: Search for middleware or guards applied to the '/queues' route
rg "route: '/queues'" -A 5 -g "*.ts" -g "*.tsx"
# Test: Search for any global guards or middleware in the main application file
rg "const app = await NestFactory.create" -A 10 -g "*.ts" -g "*.tsx"
Length of output: 2919 |
||
], | ||
providers: [OutgoingWebhookQueueService], | ||
exports: [BullModule, OutgoingWebhookQueueService], | ||
}) | ||
export class BullMqModule {} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
import { BaseJobOptions } from 'bullmq/dist/esm/interfaces'; | ||
|
||
export const QUEUES = { | ||
INCOMING_WEBHOOKS_QUEUE: { | ||
name: 'incoming-webhook-queue', | ||
config: { | ||
attempts: 10, | ||
backoff: { | ||
type: 'exponential', | ||
delay: 1000, | ||
}, | ||
}, | ||
}, | ||
OUTGOING_WEBHOOKS_QUEUE: { | ||
name: 'outgoing-webhook-queue', | ||
config: { | ||
attempts: 10, | ||
backoff: { | ||
type: 'exponential', | ||
delay: 1000, | ||
}, | ||
}, | ||
}, | ||
} satisfies Record<string, { name: string; config: BaseJobOptions }>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we would want something like USE_REDIS - to change between using redis to im-mem implmenation