Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: move logic for calling webhook to separate queue #311

Merged
merged 2 commits into from
Aug 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions apps/api/src/common/constants/notifications.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ export const ChannelType = {
export const QueueAction = {
SEND: 'send',
DELIVERY_STATUS: 'delivery-status',
WEBHOOK: 'webhook',
};

export const ProviderDeliveryStatus = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { MessagesSendResult } from 'mailgun.js';
import { Inject, Injectable, forwardRef } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { WebhookService } from 'src/modules/webhook/webhook.service';
import { NotificationQueueProducer } from 'src/jobs/producers/notifications/notifications.job.producer';

@Injectable()
export class MailgunNotificationConsumer extends NotificationConsumer {
Expand All @@ -19,10 +20,18 @@ export class MailgunNotificationConsumer extends NotificationConsumer {
private readonly mailgunService: MailgunService,
@Inject(forwardRef(() => NotificationsService))
notificationsService: NotificationsService,
@Inject(forwardRef(() => NotificationQueueProducer))
notificationsQueueService: NotificationQueueProducer,
webhookService: WebhookService,
configService: ConfigService,
) {
super(notificationRepository, notificationsService, webhookService, configService);
super(
notificationRepository,
notificationsService,
notificationsQueueService,
webhookService,
configService,
);
}

async processMailgunNotificationQueue(id: number): Promise<void> {
Expand Down
21 changes: 18 additions & 3 deletions apps/api/src/jobs/consumers/notifications/notification.consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@ import { Repository } from 'typeorm';
import { Notification } from 'src/modules/notifications/entities/notification.entity';
import {
DeliveryStatus,
QueueAction,
SkipProviderConfirmationChannels,
} from 'src/common/constants/notifications';
import { NotificationsService } from 'src/modules/notifications/notifications.service';
import { ConfigService } from '@nestjs/config';
import { WebhookService } from 'src/modules/webhook/webhook.service';
import { NotificationQueueProducer } from 'src/jobs/producers/notifications/notifications.job.producer';

@Injectable()
export abstract class NotificationConsumer {
Expand All @@ -19,6 +21,7 @@ export abstract class NotificationConsumer {
@InjectRepository(Notification)
protected readonly notificationRepository: Repository<Notification>,
protected readonly notificationsService: NotificationsService,
private readonly notificationQueueService: NotificationQueueProducer,
protected readonly webhookService: WebhookService,
private readonly configService: ConfigService,
) {
Expand All @@ -43,7 +46,11 @@ export abstract class NotificationConsumer {
`Channel type: ${notification.channelType} is included in skip queue. Provider confirmation skipped for notification id ${notification.id}`,
);
notification.deliveryStatus = DeliveryStatus.SUCCESS;
await this.webhookService.triggerWebhook(notification);
await this.notificationRepository.save(notification);
await this.notificationQueueService.addNotificationToQueue(
QueueAction.WEBHOOK,
notification,
);
} else {
this.logger.debug(
`Notification id ${notification.id} is awaiting confirmation from provider`,
Expand Down Expand Up @@ -115,7 +122,11 @@ export abstract class NotificationConsumer {
}

if (notification.deliveryStatus === DeliveryStatus.SUCCESS) {
await this.webhookService.triggerWebhook(notification);
await this.notificationRepository.save(notification);
await this.notificationQueueService.addNotificationToQueue(
QueueAction.WEBHOOK,
notification,
);
}
} catch (error) {
if (notification.retryCount < this.maxRetryCount) {
Expand All @@ -126,7 +137,11 @@ export abstract class NotificationConsumer {
`Notification with ID ${notification.id} has attempted max allowed retries (provider confirmation), setting delivery status to ${DeliveryStatus.FAILED}`,
);
notification.deliveryStatus = DeliveryStatus.FAILED;
await this.webhookService.triggerWebhook(notification);
await this.notificationRepository.save(notification);
await this.notificationQueueService.addNotificationToQueue(
QueueAction.WEBHOOK,
notification,
);
}

this.logger.error(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { Inject, Injectable, forwardRef } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { PushSnsData, PushSnsService } from 'src/modules/providers/push-sns/push-sns.service';
import { WebhookService } from 'src/modules/webhook/webhook.service';
import { NotificationQueueProducer } from 'src/jobs/producers/notifications/notifications.job.producer';

@Injectable()
export class PushSnsNotificationConsumer extends NotificationConsumer {
Expand All @@ -16,10 +17,18 @@ export class PushSnsNotificationConsumer extends NotificationConsumer {
private readonly pushSnsService: PushSnsService,
@Inject(forwardRef(() => NotificationsService))
notificationsService: NotificationsService,
configService: ConfigService,
@Inject(forwardRef(() => NotificationQueueProducer))
notificationsQueueService: NotificationQueueProducer,
webhookService: WebhookService,
configService: ConfigService,
) {
super(notificationRepository, notificationsService, webhookService, configService);
super(
notificationRepository,
notificationsService,
notificationsQueueService,
webhookService,
configService,
);
}

async processPushSnsNotificationQueue(id: number): Promise<void> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { Notification } from 'src/modules/notifications/entities/notification.en
import { Inject, Injectable, forwardRef } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { WebhookService } from 'src/modules/webhook/webhook.service';
import { NotificationQueueProducer } from 'src/jobs/producers/notifications/notifications.job.producer';

@Injectable()
export class SmsKapsystemNotificationsConsumer extends NotificationConsumer {
Expand All @@ -19,10 +20,18 @@ export class SmsKapsystemNotificationsConsumer extends NotificationConsumer {
private readonly kapsystemService: SmsKapsystemService,
@Inject(forwardRef(() => NotificationsService))
notificationsService: NotificationsService,
@Inject(forwardRef(() => NotificationQueueProducer))
notificationsQueueService: NotificationQueueProducer,
webhookService: WebhookService,
configService: ConfigService,
) {
super(notificationRepository, notificationsService, webhookService, configService);
super(
notificationRepository,
notificationsService,
notificationsQueueService,
webhookService,
configService,
);
}

async processSmsKapsystemNotificationQueue(id: number): Promise<void> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import { Inject, Injectable, forwardRef } from '@nestjs/common';
import { DeliveryStatus, ProviderDeliveryStatus } from 'src/common/constants/notifications';
import { ConfigService } from '@nestjs/config';
import { WebhookService } from 'src/modules/webhook/webhook.service';
import { NotificationQueueProducer } from 'src/jobs/producers/notifications/notifications.job.producer';

@Injectable()
export class SmsPlivoNotificationsConsumer extends NotificationConsumer {
Expand All @@ -21,10 +22,18 @@ export class SmsPlivoNotificationsConsumer extends NotificationConsumer {
private readonly smsPlivoService: SmsPlivoService,
@Inject(forwardRef(() => NotificationsService))
notificationsService: NotificationsService,
@Inject(forwardRef(() => NotificationQueueProducer))
notificationsQueueService: NotificationQueueProducer,
webhookService: WebhookService,
configService: ConfigService,
) {
super(notificationRepository, notificationsService, webhookService, configService);
super(
notificationRepository,
notificationsService,
notificationsQueueService,
webhookService,
configService,
);
}

async processSmsPlivoNotificationQueue(id: number): Promise<void> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import { Inject, Injectable, forwardRef } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { DeliveryStatus, ProviderDeliveryStatus } from 'src/common/constants/notifications';
import { WebhookService } from 'src/modules/webhook/webhook.service';
import { NotificationQueueProducer } from 'src/jobs/producers/notifications/notifications.job.producer';

@Injectable()
export class SmsTwilioNotificationsConsumer extends NotificationConsumer {
Expand All @@ -21,10 +22,18 @@ export class SmsTwilioNotificationsConsumer extends NotificationConsumer {
private readonly smsTwilioService: SmsTwilioService,
@Inject(forwardRef(() => NotificationsService))
notificationsService: NotificationsService,
@Inject(forwardRef(() => NotificationQueueProducer))
notificationsQueueService: NotificationQueueProducer,
webhookService: WebhookService,
configService: ConfigService,
) {
super(notificationRepository, notificationsService, webhookService, configService);
super(
notificationRepository,
notificationsService,
notificationsQueueService,
webhookService,
configService,
);
}

async processSmsTwilioNotificationQueue(id: number): Promise<void> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { NotificationConsumer } from './notification.consumer';
import { Inject, Injectable, forwardRef } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { WebhookService } from 'src/modules/webhook/webhook.service';
import { NotificationQueueProducer } from 'src/jobs/producers/notifications/notifications.job.producer';

@Injectable()
export class SmtpNotificationConsumer extends NotificationConsumer {
Expand All @@ -17,10 +18,18 @@ export class SmtpNotificationConsumer extends NotificationConsumer {
private readonly smtpService: SmtpService,
@Inject(forwardRef(() => NotificationsService))
notificationsService: NotificationsService,
@Inject(forwardRef(() => NotificationQueueProducer))
notificationsQueueService: NotificationQueueProducer,
webhookService: WebhookService,
configService: ConfigService,
) {
super(notificationRepository, notificationsService, webhookService, configService);
super(
notificationRepository,
notificationsService,
notificationsQueueService,
webhookService,
configService,
);
}

async processSmtpNotificationQueue(id: number): Promise<void> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import { Inject, Injectable, forwardRef } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { DeliveryStatus, ProviderDeliveryStatus } from 'src/common/constants/notifications';
import { WebhookService } from 'src/modules/webhook/webhook.service';
import { NotificationQueueProducer } from 'src/jobs/producers/notifications/notifications.job.producer';

@Injectable()
export class VcTwilioNotificationsConsumer extends NotificationConsumer {
Expand All @@ -21,10 +22,18 @@ export class VcTwilioNotificationsConsumer extends NotificationConsumer {
private readonly vcTwilioService: VcTwilioService,
@Inject(forwardRef(() => NotificationsService))
notificationsService: NotificationsService,
@Inject(forwardRef(() => NotificationQueueProducer))
notificationsQueueService: NotificationQueueProducer,
webhookService: WebhookService,
configService: ConfigService,
) {
super(notificationRepository, notificationsService, webhookService, configService);
super(
notificationRepository,
notificationsService,
notificationsQueueService,
webhookService,
configService,
);
}

async processVcTwilioNotificationQueue(id: number): Promise<void> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { NotificationConsumer } from './notification.consumer';
import { Inject, Injectable, forwardRef } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { WebhookService } from 'src/modules/webhook/webhook.service';
import { NotificationQueueProducer } from 'src/jobs/producers/notifications/notifications.job.producer';

@Injectable()
export class Wa360dialogNotificationsConsumer extends NotificationConsumer {
Expand All @@ -19,10 +20,18 @@ export class Wa360dialogNotificationsConsumer extends NotificationConsumer {
private readonly wa360dialogService: Wa360dialogService,
@Inject(forwardRef(() => NotificationsService))
notificationsService: NotificationsService,
@Inject(forwardRef(() => NotificationQueueProducer))
notificationsQueueService: NotificationQueueProducer,
webhookService: WebhookService,
configService: ConfigService,
) {
super(notificationRepository, notificationsService, webhookService, configService);
super(
notificationRepository,
notificationsService,
notificationsQueueService,
webhookService,
configService,
);
}

async processWa360dialogNotificationQueue(id: number): Promise<void> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import { Inject, Injectable, forwardRef } from '@nestjs/common';
import { DeliveryStatus, ProviderDeliveryStatus } from 'src/common/constants/notifications';
import { ConfigService } from '@nestjs/config';
import { WebhookService } from 'src/modules/webhook/webhook.service';
import { NotificationQueueProducer } from 'src/jobs/producers/notifications/notifications.job.producer';

@Injectable()
export class WaTwilioNotificationsConsumer extends NotificationConsumer {
Expand All @@ -21,10 +22,18 @@ export class WaTwilioNotificationsConsumer extends NotificationConsumer {
private readonly waTwilioService: WaTwilioService,
@Inject(forwardRef(() => NotificationsService))
notificationsService: NotificationsService,
@Inject(forwardRef(() => NotificationQueueProducer))
notificationsQueueService: NotificationQueueProducer,
webhookService: WebhookService,
configService: ConfigService,
) {
super(notificationRepository, notificationsService, webhookService, configService);
super(
notificationRepository,
notificationsService,
notificationsQueueService,
webhookService,
configService,
);
}

async processWaTwilioNotificationQueue(id: number): Promise<void> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import { Inject, Injectable, forwardRef } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { ProviderDeliveryStatus, DeliveryStatus } from 'src/common/constants/notifications';
import { WebhookService } from 'src/modules/webhook/webhook.service';
import { NotificationQueueProducer } from 'src/jobs/producers/notifications/notifications.job.producer';

@Injectable()
export class WaTwilioBusinessNotificationsConsumer extends NotificationConsumer {
Expand All @@ -21,10 +22,18 @@ export class WaTwilioBusinessNotificationsConsumer extends NotificationConsumer
private readonly waTwilioBusinessService: WaTwilioBusinessService,
@Inject(forwardRef(() => NotificationsService))
notificationsService: NotificationsService,
@Inject(forwardRef(() => NotificationQueueProducer))
notificationsQueueService: NotificationQueueProducer,
webhookService: WebhookService,
configService: ConfigService,
) {
super(notificationRepository, notificationsService, webhookService, configService);
super(
notificationRepository,
notificationsService,
notificationsQueueService,
webhookService,
configService,
);
}

async processWaTwilioBusinessNotificationQueue(id: number): Promise<void> {
Expand Down
15 changes: 15 additions & 0 deletions apps/api/src/modules/notifications/queues/queue.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import { VcTwilioNotificationsConsumer } from 'src/jobs/consumers/notifications/
import { Wa360dialogNotificationsConsumer } from 'src/jobs/consumers/notifications/wa360dialog-notifications.job.consumer';
import { WaTwilioNotificationsConsumer } from 'src/jobs/consumers/notifications/waTwilio-notifications.job.consumer';
import { WaTwilioBusinessNotificationsConsumer } from 'src/jobs/consumers/notifications/waTwilioBusiness-notifications.job.consumer';
import { WebhookService } from 'src/modules/webhook/webhook.service';

@Injectable()
export class QueueService {
Expand All @@ -36,6 +37,7 @@ export class QueueService {
private readonly smsKapsystemNotificationConsumer: SmsKapsystemNotificationsConsumer,
private readonly pushSnsNotificationConsumer: PushSnsNotificationConsumer,
private readonly vcTwilioNotificationsConsumer: VcTwilioNotificationsConsumer,
protected readonly webhookService: WebhookService,
) {
this.redisConfig = {
host: this.configService.get<string>('REDIS_HOST'),
Expand Down Expand Up @@ -168,6 +170,19 @@ export class QueueService {
job.data.id,
);
break;
// WEBHOOK
case `${QueueAction.WEBHOOK}-${ChannelType.SMTP}`:
case `${QueueAction.WEBHOOK}-${ChannelType.MAILGUN}`:
case `${QueueAction.WEBHOOK}-${ChannelType.WA_360_DAILOG}`:
case `${QueueAction.WEBHOOK}-${ChannelType.WA_TWILIO}`:
case `${QueueAction.WEBHOOK}-${ChannelType.SMS_TWILIO}`:
case `${QueueAction.WEBHOOK}-${ChannelType.SMS_PLIVO}`:
case `${QueueAction.WEBHOOK}-${ChannelType.WA_TWILIO_BUSINESS}`:
case `${QueueAction.WEBHOOK}-${ChannelType.SMS_KAPSYSTEM}`:
case `${QueueAction.WEBHOOK}-${ChannelType.PUSH_SNS}`:
case `${QueueAction.WEBHOOK}-${ChannelType.VC_TWILIO}`:
await this.webhookService.triggerWebhook(job.data.id);
break;
default:
this.logger.error(
`Unsupported action-providerType combination: ${action}-${providerType}`,
Expand Down
Loading
Loading