Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature/bull_mq_integration #2735

Open
wants to merge 18 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,388 changes: 1,037 additions & 351 deletions pnpm-lock.yaml

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions services/workflows-service/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,7 @@ WEB_UI_SDK_URL=http://localhost:5202
#HASHING_KEY_SECRET="$2b$10$FovZTB91/QQ4Yu28nvL8e."
HASHING_KEY_SECRET_BASE64=JDJiJDEwJDNFeWtwWEs4QkdiczlRaWFwLkM4Vk8=
NOTION_API_KEY=secret
REDIS_HOST=localhost
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we would want something like USE_REDIS - to change between using redis to im-mem implmenation

REDIS_PORT=7381
REDIS_PASSWORD=password
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Security concern: Weak Redis password.

The REDIS_PASSWORD=password is a significant security risk. Even though this is an example file, it's crucial to:

  1. Use a strong, unique password for each environment.
  2. Never commit actual passwords to version control.
  3. Consider using a secret management system for production environments.

Replace the current line with:

-REDIS_PASSWORD=password
+REDIS_PASSWORD=<strong-unique-password>

Also, add a comment above this line:

# IMPORTANT: Replace with a strong, unique password. Never commit the actual password.
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
REDIS_PASSWORD=password
# IMPORTANT: Replace with a strong, unique password. Never commit the actual password.
REDIS_PASSWORD=<strong-unique-password>

IS_WORKER_SERVICE=false
22 changes: 22 additions & 0 deletions services/workflows-service/docker-compose.redis.yml
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you create a new docker-compose file and didn't use the existing one?

Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
version: '3.8'
services:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is it located on a new docker image? Our service has to use redis in order to run ...
you can place it with a dependency and when Redis is ready start workflow service

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why cant we use the same docker-compose-db.yml ?

we can use same like this

version: '3'
services:
  db:
    image: sibedge/postgres-plv8:15.3-3.1.7
    ports:
      - ${DB_PORT}:5432
    environment:
      POSTGRES_USER: ${DB_USER}
      POSTGRES_PASSWORD: ${DB_PASSWORD}
    volumes:
      - postgres15:/var/lib/postgresql/data
  redis:
    image: redis:alpine
    ports:
      - '${REDIS_PORT}:6379'
    volumes:
      - redis-data:/data
    command: >
      --requirepass ${REDIS_PASSWORD}
      --appendonly yes
    environment:
      - REDIS_PASSWORD=${REDIS_PASSWORD}
      - REDIS_PORT=${REDIS_PORT}
volumes:
  postgres15: ~
  redis-data:
    driver: local

Bring only redis using :

This Line
can be replaced with the following

docker:redis: docker compose -f docker-compose.db.yml up -d redis --wait
docker:redis:down: docker compose -f docker-compose.db.yml down -v redis,

redis:
image: redis:alpine
Copy link
Collaborator

@MatanYadaev MatanYadaev Sep 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it for local only? Because the Alpine image is less performant than the Debian image

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes it's local only

ports:
- '${REDIS_PORT}:6379'
volumes:
- redis-data:/data
command: >
--requirepass ${REDIS_PASSWORD}
--appendonly yes
environment:
- REDIS_PASSWORD=${REDIS_PASSWORD}
- REDIS_PORT=${REDIS_PORT}
networks:
- app-network
volumes:
redis-data:
driver: local
networks:
app-network:
driver: bridge
11 changes: 10 additions & 1 deletion services/workflows-service/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@
"description": "workflow-service",
"scripts": {
"spellcheck": "cspell \"*\"",
"setup": "npm run docker:db:down && npm run docker:db && wait-on tcp:5432 && npm run db:reset && npm run seed",
"setup": "npm run docker:db:down && npm run docker:db && wait-on tcp:5432 && npm run docker:redis:down && npm run docker:redis && npm run db:reset && npm run seed",
Copy link
Collaborator

@pratapalakshmi pratapalakshmi Oct 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is better

"setup": "cp .env.example .env && npm run docker:db:down && npm run docker:db && wait-on tcp:5432 && npm run docker:redis:down && npm run docker:redis && npm run db:reset && npm run seed",

"format": "prettier --write . '!**/*.{md,hbs}'",
"format:check": "prettier --check . '!**/*.{md,hbs}'",
"lint": "eslint . --fix",
"start": "nest start",
"dev": "npm run start:watch",
"worker": "nest start --entryFile ./worker-main",
"prod": "npm run db:migrate-up && node dist/src/main",
"worker:prod": "npm run db:migrate-up && node dist/src/worker-main",
"prod:next": "npm run db:migrate-up && npm run db:data-sync && node dist/src/main",
"start:watch": "nest start --watch",
"start:debug": "nest start --debug --watch",
Expand All @@ -30,6 +32,8 @@
"db:reset:dev:with-data": "npm run db:reset:dev && npm run db:data-migration:migrate && npm run db:data-sync",
"db:init": "npm run db:migrate-dev -- --name 'initial version' && npm run db:migrate-up seed",
"prisma:generate": "prisma generate",
"docker:redis": "docker compose -f docker-compose.redis.yml up -d --wait",
Copy link
Collaborator

@MatanYadaev MatanYadaev Sep 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you run this command this way, the REDIS_PORT variable won't be loaded from the dotenv. Is it part of the requirements?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from what i checked it worked

"docker:redis:down": "docker compose -f docker-compose.redis.yml down --volumes",
"docker:db": "docker compose -f docker-compose.db.yml up -d --wait",
"docker:db:down": "docker compose -f docker-compose.db.yml down --volumes",
"docker:build": "docker build .",
Expand All @@ -50,8 +54,12 @@
"@ballerine/common": "0.9.40",
"@ballerine/workflow-core": "0.6.52",
"@ballerine/workflow-node-sdk": "0.6.52",
"@bull-board/api": "^6.0.0",
"@bull-board/express": "^6.0.0",
"@bull-board/nestjs": "^6.0.0",
"@faker-js/faker": "^7.6.0",
"@nestjs/axios": "^2.0.0",
"@nestjs/bullmq": "^10.2.1",
"@nestjs/common": "^9.3.12",
"@nestjs/config": "2.3.1",
"@nestjs/core": "^9.3.12",
Expand All @@ -78,6 +86,7 @@
"ballerine-nestjs-typebox": "3.0.2-next.11",
"base64-stream": "^1.0.0",
"bcrypt": "5.1.0",
"bullmq": "^5.13.2",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you should also include redis client

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add bull-arena which is the UI component for bull

"class-transformer": "0.5.1",
"class-validator": "0.14.0",
"concat-stream": "^2.0.0",
Expand Down
2 changes: 1 addition & 1 deletion services/workflows-service/prisma/data-migrations
11 changes: 7 additions & 4 deletions services/workflows-service/src/alert/alert.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import { ProjectModule } from '@/project/project.module';
import { UserRepository } from '@/user/user.repository';
import { AlertDefinitionModule } from '@/alert-definition/alert-definition.module';
import { SentryModule } from '@/sentry/sentry.module';
import { BullMqModule } from '@/bull-mq/bull-mq.module';
import { OutgoingWebhooksModule } from '@/webhooks/outgoing-webhooks/outgoing-webhooks.module';

@Module({
imports: [
Expand All @@ -32,6 +34,8 @@ import { SentryModule } from '@/sentry/sentry.module';
PrismaModule,
SentryModule,
ProjectModule,
BullMqModule,
OutgoingWebhooksModule,
HttpModule.register({
timeout: 5000,
maxRedirects: 10,
Expand All @@ -48,15 +52,14 @@ import { SentryModule } from '@/sentry/sentry.module';
],
controllers: [AlertControllerInternal, AlertControllerExternal],
providers: [
{
provide: WebhookHttpService,
useExisting: HttpService,
},
WebhookHttpService,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Review module inclusion in providers array.

The simplification of WebhookHttpService provider is good. However, including BullMqModule and OutgoingWebhooksModule in both the imports and providers arrays is unusual and potentially incorrect.

Modules are typically only included in the imports array, not in providers. The providers array is meant for services, not modules.

Consider removing BullMqModule and OutgoingWebhooksModule from the providers array:

 providers: [
   WebhookHttpService,
   AlertService,
   AlertRepository,
   AlertDefinitionRepository,
   WebhookManagerService,
   WebhookEventEmitterService,
-  BullMqModule,
-  OutgoingWebhooksModule,
   // TODO: Export to user module
   UserService,
   UserRepository,
   PasswordService,
 ],

Also applies to: 61-62

AlertService,
AlertRepository,
AlertDefinitionRepository,
WebhookManagerService,
WebhookEventEmitterService,
BullMqModule,
OutgoingWebhooksModule,
// TODO: Export to user module
UserService,
UserRepository,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,27 +1,27 @@
import { alertWebhookFailure } from '@/events/alert-webhook-failure';
import { lastValueFrom } from 'rxjs';
import * as common from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { ClsService } from 'nestjs-cls';
import { SentryInterceptor } from '@/sentry/sentry.interceptor';
import { AppLoggerService } from '@/common/app-logger/app-logger.service';
import { WebhookEventEmitterService } from './webhook-event-emitter.service';
import { IWebhookEntityEventData } from './types';
import { Webhook } from '@/events/get-webhooks';
import { HttpService } from '@nestjs/axios';
import { sign } from '@ballerine/common';
import { AnyRecord } from '@ballerine/common';
import { env } from '@/env';
import { OutgoingWebhookQueueService } from '@/bull-mq/outgoing-webhook/outgoing-webhook-queue.service';
import { OutgoingWebhooksService } from '@/webhooks/outgoing-webhooks/outgoing-webhooks.service';

@common.Injectable()
export abstract class WebhookHttpService extends HttpService {}
export class WebhookHttpService {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Unused Empty Class WebhookHttpService

The class WebhookHttpService is currently empty without any methods or properties. If it's no longer needed, consider removing it to keep the codebase clean. If it's a placeholder for future development, adding a comment to indicate its intended use would be helpful.


@common.Injectable()
@common.UseInterceptors(SentryInterceptor)
export class WebhookManagerService {
constructor(
private readonly cls: ClsService,
protected readonly logger: AppLoggerService,
protected readonly configService: ConfigService,
protected readonly httpService: WebhookHttpService,
protected readonly outgoingQueueWebhookService: OutgoingWebhookQueueService,
protected readonly outgoingWebhookService: OutgoingWebhooksService,
protected readonly webhookEventEmitter: WebhookEventEmitterService,
) {
webhookEventEmitter.on('*', async (eventData: any) => {
Expand All @@ -45,21 +45,42 @@ export class WebhookManagerService {
webhook: Webhook;
webhookSharedSecret: string;
}) {
try {
const { id, url, environment, apiVersion } = webhook;
const { id, url, environment, apiVersion } = webhook;

if (env.QUEUE_SYSTEM_ENABLED) {
return await this.outgoingQueueWebhookService.addJob({
requestConfig: {
url,
method: 'POST',
headers: {},
body: data as unknown as AnyRecord,
timeout: 15_000,
},
customerConfig: {
webhookSharedSecret,
},
});
}

try {
this.logger.log('Sending webhook', { id, url });

const res = await lastValueFrom(
this.httpService.post(url, data, {
headers: {
'X-HMAC-Signature': sign({
payload: data,
key: webhookSharedSecret,
}),
},
}),
);
const response = await this.outgoingWebhookService.invokeWebhook({
requestConfig: {
url,
method: 'POST',
headers: {},
body: data as unknown as AnyRecord,
timeout: 15_000,
},
customerConfig: {
webhookSharedSecret,
},
});

if (response.status < 200 || response.status >= 300) {
throw new Error(`Webhook failed with status ${response.status} for ${url}`);
}
} catch (error: Error | any) {
this.logger.error('Webhook error data', {
data,
Expand Down
8 changes: 6 additions & 2 deletions services/workflows-service/src/app.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ import { WorkflowModule } from '@/workflow/workflow.module';
import { TransactionModule } from '@/transaction/transaction.module';
import { AlertModule } from '@/alert/alert.module';
import { SwaggerController } from './swagger/swagger.controller';
import { WebhooksModule } from '@/webhooks/webhooks.module';
import { BusinessReportModule } from '@/business-report/business-report.module';
import { ScheduleModule } from '@nestjs/schedule';
import { CronModule } from '@/workflow/cron/cron.module';
Expand All @@ -48,6 +47,9 @@ import { hashKey } from './customer/api-key/utils';
import { RuleEngineModule } from './rule-engine/rule-engine.module';
import { NotionModule } from '@/notion/notion.module';
import { SecretsManagerModule } from '@/secrets-manager/secrets-manager.module';
import { BullMqModule } from '@/bull-mq/bull-mq.module';
import { IncomingWebhooksModule } from '@/webhooks/incoming/incoming-webhooks.module';
import { OutgoingWebhooksModule } from '@/webhooks/outgoing-webhooks/outgoing-webhooks.module';

export const validate = async (config: Record<string, unknown>) => {
const zodEnvSchema = z
Expand Down Expand Up @@ -85,7 +87,8 @@ export const validate = async (config: Record<string, unknown>) => {
EventEmitterModule.forRoot(),
UserModule,
WorkflowModule,
WebhooksModule,
IncomingWebhooksModule,
OutgoingWebhooksModule,
UiDefinitionModule,
StorageModule,
DataMigrationModule,
Expand Down Expand Up @@ -126,6 +129,7 @@ export const validate = async (config: Record<string, unknown>) => {
RuleEngineModule,
NotionModule,
SecretsManagerModule,
BullMqModule,
],
providers: [
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import { ConnectionOptions, Job, Queue, Worker } from 'bullmq';
import { Injectable, OnModuleDestroy } from '@nestjs/common';
import { REDIS_CONFIG } from '@/redis/const/redis-config';
import { env } from '@/env';
import { AppLoggerService } from '@/common/app-logger/app-logger.service';
import { QUEUES } from '@/bull-mq/consts';

@Injectable()
export abstract class BaseQueueWorkerService<T = any> implements OnModuleDestroy {
protected queue?: Queue;
protected worker?: Worker;
protected connectionOptions: ConnectionOptions;

protected constructor(protected queueName: string, protected readonly logger: AppLoggerService) {
this.connectionOptions = {
...REDIS_CONFIG,
};

if (!env.QUEUE_SYSTEM_ENABLED) {
return;
}

this.queue = new Queue(queueName, {
connection: this.connectionOptions,
defaultJobOptions: {
...Object.entries(QUEUES).find(([_, queueOptions]) => queueOptions.name === queueName)?.[1]
.config,
},
});

if (env.IS_WORKER_SERVICE !== true) {
this.initializeWorker();
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Improve environment variable checks and add error handling.

  1. Environment variable checks:
    • Convert env.QUEUE_SYSTEM_ENABLED and env.IS_WORKER_SERVICE to booleans for comparison.
  2. Add error handling for queue initialization.

Apply this diff to improve the constructor:

   protected constructor(protected queueName: string, protected readonly logger: AppLoggerService) {
     this.connectionOptions = {
       ...REDIS_CONFIG,
     };

-    if (!env.QUEUE_SYSTEM_ENABLED) {
+    if (env.QUEUE_SYSTEM_ENABLED !== 'true') {
       return;
     }

-    this.queue = new Queue(queueName, {
-      connection: this.connectionOptions,
-      defaultJobOptions: {
-        ...Object.entries(QUEUES).find(([_, queueOptions]) => queueOptions.name === queueName)?.[1]
-          .config,
-      },
-    });
+    try {
+      this.queue = new Queue(queueName, {
+        connection: this.connectionOptions,
+        defaultJobOptions: {
+          ...Object.entries(QUEUES).find(([_, queueOptions]) => queueOptions.name === queueName)?.[1]
+            ?.config,
+        },
+      });
+    } catch (error) {
+      this.logger.error(`Failed to initialize queue: ${error.message}`);
+      return;
+    }

-    if (env.IS_WORKER_SERVICE !== true) {
+    if (env.IS_WORKER_SERVICE === 'true') {
       this.initializeWorker();
     }
   }

These changes ensure proper boolean comparisons for environment variables and add error handling for queue initialization.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
protected constructor(protected queueName: string, protected readonly logger: AppLoggerService) {
this.connectionOptions = {
...REDIS_CONFIG,
};
if (!env.QUEUE_SYSTEM_ENABLED) {
return;
}
this.queue = new Queue(queueName, {
connection: this.connectionOptions,
defaultJobOptions: {
...Object.entries(QUEUES).find(([_, queueOptions]) => queueOptions.name === queueName)?.[1]
.config,
},
});
if (env.IS_WORKER_SERVICE !== true) {
this.initializeWorker();
}
}
protected constructor(protected queueName: string, protected readonly logger: AppLoggerService) {
this.connectionOptions = {
...REDIS_CONFIG,
};
if (env.QUEUE_SYSTEM_ENABLED !== 'true') {
return;
}
try {
this.queue = new Queue(queueName, {
connection: this.connectionOptions,
defaultJobOptions: {
...Object.entries(QUEUES).find(([_, queueOptions]) => queueOptions.name === queueName)?.[1]
?.config,
},
});
} catch (error) {
this.logger.error(`Failed to initialize queue: ${error.message}`);
return;
}
if (env.IS_WORKER_SERVICE === 'true') {
this.initializeWorker();
}
}


abstract handleJob(job: Job<T>): Promise<void>;

async addJob(jobData: T, jobOptions = {}): Promise<void> {
await this.queue?.add(this.queueName, jobData, jobOptions);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Improve error handling in addJob method.

The current implementation silently fails when the queue is undefined. Implement explicit error handling to make the behavior more predictable and easier to debug.

Apply this diff to improve the addJob method:

   async addJob(jobData: T, jobOptions = {}): Promise<void> {
-    await this.queue?.add(this.queueName, jobData, jobOptions);
+    if (!this.queue) {
+      this.logger.warn('Attempted to add job while queue system is disabled');
+      throw new Error('Queue system is disabled. Cannot add job.');
+    }
+    try {
+      await this.queue.add(this.queueName, jobData, jobOptions);
+    } catch (error) {
+      this.logger.error(`Failed to add job to queue: ${error.message}`);
+      throw error;
+    }
   }

This change ensures that callers are aware when jobs cannot be added due to a disabled queue system and provides better error logging.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
async addJob(jobData: T, jobOptions = {}): Promise<void> {
await this.queue?.add(this.queueName, jobData, jobOptions);
}
async addJob(jobData: T, jobOptions = {}): Promise<void> {
if (!this.queue) {
this.logger.warn('Attempted to add job while queue system is disabled');
throw new Error('Queue system is disabled. Cannot add job.');
}
try {
await this.queue.add(this.queueName, jobData, jobOptions);
} catch (error) {
this.logger.error(`Failed to add job to queue: ${error.message}`);
throw error;
}
}


protected initializeWorker() {
this.worker = new Worker(
this.queueName,
async (job: Job<T>) => {
await this.handleJob(job);
},
{ connection: this.connectionOptions },
);

this.worker?.on('completed', (job: Job) => {
this.logger.log(`Webhook job ${job.id} completed successfully`);
});

this.worker?.on('failed', (job, error, prev) => {
this.logger.error(
`Webhook job ${job?.id} failed after in queue: ${this.queue?.name} retries: ${error.message}`,
);
});

this.queue?.on('cleaned', (jobs, type) => {
this.logger.log(`${jobs.length} ${type} jobs have been cleaned from the webhook queue`);
});
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add queue system check in initializeWorker and LGTM on event listeners.

The event listeners for job completion, failure, and queue cleaning are well implemented. However, we should add a check to ensure the queue system is enabled before initializing the worker.

Apply this diff to add the check:

   protected initializeWorker() {
+    if (!this.queue) {
+      this.logger.warn('Attempted to initialize worker while queue system is disabled');
+      return;
+    }
     this.worker = new Worker(
       // ... rest of the method
     );
     // ... event listeners
   }

This change prevents attempting to initialize a worker when the queue system is disabled.

The event listener setup is well-implemented, providing good logging for job statuses and queue cleaning.

Committable suggestion was skipped due to low confidence.


async onModuleDestroy() {
await Promise.all([this.worker?.close(), this.queue?.close()]);
}
}
45 changes: 45 additions & 0 deletions services/workflows-service/src/bull-mq/bull-mq.module.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import { Module } from '@nestjs/common';
import { BullModule } from '@nestjs/bullmq';
import { BullBoardModule } from '@bull-board/nestjs';
import { ExpressAdapter } from '@bull-board/express';
import { BullAdapter } from '@bull-board/api/bullAdapter';
import { AppLoggerModule } from '@/common/app-logger/app-logger.module';
import { QUEUES } from '@/bull-mq/consts';
import { OutgoingWebhookQueueService } from '@/bull-mq/outgoing-webhook/outgoing-webhook-queue.service';
import { REDIS_CONFIG } from '@/redis/const/redis-config';
import { OutgoingWebhooksModule } from '@/webhooks/outgoing-webhooks/outgoing-webhooks.module';

@Module({
imports: [
AppLoggerModule,
OutgoingWebhooksModule,
BullModule.forRootAsync({
useFactory: () => {
return {
connection: {
...REDIS_CONFIG,
},
};
},
}),
BullModule.registerQueue(
...Object.values(QUEUES).map(queue => ({
name: queue.name,
...queue.config,
})),
),
BullBoardModule.forRoot({
route: '/queues',
adapter: ExpressAdapter,
}),
...Object.values(QUEUES).map(queue =>
BullBoardModule.forFeature({
name: queue.name,
adapter: BullAdapter,
}),
),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

'/queues' route is not secured.

The '/queues' route in BullBoardModule is currently exposed without authentication or authorization measures. It's recommended to secure this route to prevent unauthorized access to the queue monitoring interface.

  • Implement authentication middleware for the '/queues' route.
  • Review and enhance global security configurations to ensure they protect all sensitive routes.
🔗 Analysis chain

BullBoardModule configuration is correct, but consider security implications.

The BullBoardModule is properly configured for monitoring queues, using the correct adapters and dynamically adding all queues from the QUEUES constant.

However, the '/queues' route might need to be secured in a production environment. Let's verify if there are any security measures in place:

If no security measures are found, consider adding authentication middleware to protect the '/queues' route.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check for security measures on the '/queues' route

# Test: Search for middleware or guards applied to the '/queues' route
rg --type typescript -A 5 "route: '/queues'"

# Test: Search for any global guards or middleware in the main application file
rg --type typescript -A 10 'const app = await NestFactory.create'

Length of output: 190


Script:

#!/bin/bash
# Description: Check for security measures on the '/queues' route

# Test: Search for middleware or guards applied to the '/queues' route
rg "route: '/queues'" -A 5 -g "*.ts" -g "*.tsx"

# Test: Search for any global guards or middleware in the main application file
rg "const app = await NestFactory.create" -A 10 -g "*.ts" -g "*.tsx"

Length of output: 2919

],
providers: [OutgoingWebhookQueueService],
exports: [BullModule, OutgoingWebhookQueueService],
})
export class BullMqModule {}
24 changes: 24 additions & 0 deletions services/workflows-service/src/bull-mq/consts.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import { BaseJobOptions } from 'bullmq/dist/esm/interfaces';

export const QUEUES = {
INCOMING_WEBHOOKS_QUEUE: {
name: 'incoming-webhook-queue',
config: {
attempts: 10,
backoff: {
type: 'exponential',
delay: 1000,
},
},
},
OUTGOING_WEBHOOKS_QUEUE: {
name: 'outgoing-webhook-queue',
config: {
attempts: 10,
backoff: {
type: 'exponential',
delay: 1000,
},
},
},
} satisfies Record<string, { name: string; config: BaseJobOptions }>;
Loading
Loading