Skip to content
3 changes: 3 additions & 0 deletions apps/api/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ NODE_ENV= # Use "development" for graphql playground to work
# Notification configuration
MAX_RETRY_COUNT=3 # Max retry count, default is 3

# Log Level
LOG_LEVEL=info # Log level, default is info

# Database configuration
DB_TYPE=
DB_HOST=
Expand Down
12 changes: 10 additions & 2 deletions apps/api/src/common/decorators/is-data-valid.decorator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import { ChannelType } from 'src/common/constants/notifications';
import { SMTPDataDto } from 'src/modules/notifications/dtos/providers/smtp-data.dto';
import { MailgunDataDto } from 'src/modules/notifications/dtos/providers/mailgun-data.dto';
import { Wa360DialogDataDto } from 'src/modules/notifications/dtos/providers/wa360Dialog-data.dto';
import { BadRequestException } from '@nestjs/common';
import { BadRequestException, Logger } from '@nestjs/common';
import { CreateNotificationDto } from 'src/modules/notifications/dtos/create-notification.dto';
import { WaTwilioDataDto } from 'src/modules/notifications/dtos/providers/waTwilio-data.dto';
import { SmsTwilioDataDto } from 'src/modules/notifications/dtos/providers/smsTwilio-data.dto';
Expand All @@ -26,20 +26,28 @@ import { VcTwilioDataDto } from 'src/modules/notifications/dtos/providers/vcTwil
@ValidatorConstraint({ name: 'isDataValidConstraint', async: true })
@Injectable()
export class IsDataValidConstraint implements ValidatorConstraintInterface {
constructor(private readonly providersService: ProvidersService) {}
constructor(
private readonly providersService: ProvidersService,
private logger: Logger,
) {}

async validate(value: object, args: ValidationArguments): Promise<boolean> {
this.logger.debug('Request data validation started');
const object = args.object as { providerId: number; data: object };
let channelTypeFromProviderId = null;

try {
channelTypeFromProviderId = (await this.providersService.getById(object.providerId))
.channelType;
this.logger.debug(
`Fetched channel type: ${channelTypeFromProviderId} from provider Id: ${object.providerId}`,
);
} catch (error) {
throw new Error(`Error while fetching channelType from ProviderId: ${error}`);
}

const validateAndThrowError = async (validationData: object): Promise<void> => {
this.logger.debug('Awaiting Validation of request data as per request channel type');
const errors: ValidationError[] = await validate(validationData);

if (errors.length > 0) {
Expand Down
19 changes: 19 additions & 0 deletions apps/api/src/common/guards/api-key/api-key.guard.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,15 @@ export class ApiKeyGuard implements CanActivate {

async validateRequest(execContext: ExecutionContext): Promise<boolean> {
const request = execContext.switchToHttp().getRequest();
this.logger.debug(
`Request validation started for request body: ${JSON.stringify(request.body)}`,
);

// Get api key header incase of http request
if (request && request.headers) {
this.logger.debug(`Fetching request header and provider ID: ${request.body.providerId}`);
const serverApiKeyHeader = request.headers['x-api-key'];
this.logger.debug('Fetching provider Id');
const requestProviderId = request.body.providerId;
const validationResult = await this.validateApiKeyHeader(
serverApiKeyHeader,
Expand All @@ -44,6 +49,9 @@ export class ApiKeyGuard implements CanActivate {
// Get api key header incase of graphql request
const ctx = GqlExecutionContext.create(execContext);
const req = ctx.getContext().req;
this.logger.debug(
`Fetching request header and provider ID for GraphQL: ${req.body.providerId}`,
);
const serverApiKeyHeader = req.headers['x-api-key'];
const requestProviderId = request.body.providerId;
const validationResult = await this.validateApiKeyHeader(serverApiKeyHeader, requestProviderId);
Expand All @@ -59,6 +67,7 @@ export class ApiKeyGuard implements CanActivate {
serverApiKeyHeader: string,
requestProviderId: number,
): Promise<boolean> {
this.logger.debug('validateApiKeyHeader started');
let apiKeyToken = null;

if (serverApiKeyHeader) {
Expand All @@ -69,6 +78,7 @@ export class ApiKeyGuard implements CanActivate {
}

const apiKeyEntry = await this.serverApiKeysService.findByServerApiKey(apiKeyToken);
this.logger.debug(`Fetching Server API Key from request token: ${JSON.stringify(apiKeyEntry)}`);

if (!apiKeyEntry) {
//this.logger.error('Invalid x-api-key');
Expand All @@ -77,6 +87,9 @@ export class ApiKeyGuard implements CanActivate {

// Get channel type from providerId & Set the channelType based on providerEntry
const providerEntry = await this.providersService.getById(requestProviderId);
this.logger.debug(
`Fetched providerEntry from DB (using request providerId): ${JSON.stringify(providerEntry)}`,
);

if (!providerEntry) {
this.logger.error('Provider does not exist');
Expand All @@ -91,13 +104,19 @@ export class ApiKeyGuard implements CanActivate {

// Set correct ApplicationId after verifying
const inputApplicationId = await this.getApplicationIdFromApiKey(apiKeyToken);
this.logger.debug(
`Fetched ApplicationId from DB using APIKeyToken: ${JSON.stringify(inputApplicationId)}`,
);

if (inputApplicationId != providerEntry.applicationId) {
this.logger.error('The applicationId for Server Key and Provider do not match.');
throw new BadRequestException('The applicationId for Server Key and Provider do not match.');
}

if (apiKeyToken && apiKeyToken === apiKeyEntry.apiKey) {
this.logger.debug(
'Requested providerId is valid. The applicationId for Server Key and Provider match. Valid Request.',
);
return true;
}

Expand Down
4 changes: 4 additions & 0 deletions apps/api/src/config/logger.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ import { transports, format } from 'winston';
import { join } from 'path';
import { v4 as uuidv4 } from 'uuid';
import 'winston-daily-rotate-file';
import { ConfigService } from '@nestjs/config';

const configService = new ConfigService();

const logDir = 'logs';
const logFormat = format.combine(
Expand Down Expand Up @@ -57,5 +60,6 @@ const transportsConfig = [
];

export const loggerConfig = WinstonModule.createLogger({
level: configService.get('LOG_LEVEL', 'info'),
transports: transportsConfig,
});
24 changes: 24 additions & 0 deletions apps/api/src/jobs/consumers/notifications/notification.consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,31 @@ export abstract class NotificationConsumer {

try {
this.logger.log(`Sending notification with id: ${id}`);
this.logger.debug(
`Processing notification queue for channel type: ${notification.channelType}`,
);
const result = await sendNotification();

if (SkipProviderConfirmationChannels.includes(notification.channelType)) {
this.logger.debug(
`Channel type: ${notification.channelType} is included in skip queue. Provider confirmation skipped for notification id ${notification.id}`,
);
notification.deliveryStatus = DeliveryStatus.SUCCESS;
this.webhookService.triggerWebhook(notification);
} else {
this.logger.debug(
`Notification id ${notification.id} is awaiting confirmation from provider`,
);
notification.deliveryStatus = DeliveryStatus.AWAITING_CONFIRMATION;
}

this.logger.debug(`Updating result of notification with id ${notification.id}`);
notification.result = { result };
} catch (error) {
if (notification.retryCount < this.maxRetryCount) {
this.logger.debug(
`Some error occured while sendiing Notification with ID ${notification.id}. Retry Count ${notification.retryCount}/${this.maxRetryCount}. Sending notification again`,
);
notification.deliveryStatus = DeliveryStatus.PENDING;
notification.retryCount++;
} else {
Expand All @@ -54,10 +67,14 @@ export abstract class NotificationConsumer {
notification.deliveryStatus = DeliveryStatus.FAILED;
}

this.logger.debug(`Updating result of notification with id ${notification.id}`);
notification.result = { result: { message: error.message, stack: error.stack } };
this.logger.error(`Error sending notification with id: ${id}`);
this.logger.error(JSON.stringify(error, ['message', 'stack'], 2));
} finally {
this.logger.debug(
`processNotificationQueue completed. Saving notification in DB: ${JSON.stringify(notification)}`,
);
await this.notificationRepository.save(notification);
}
}
Expand All @@ -73,7 +90,11 @@ export abstract class NotificationConsumer {

try {
this.logger.log(`Checking delivery status from provider for notification with id: ${id}`);
this.logger.debug(
`Processing awaiting confirmation notification queue for channel type: ${notification.channelType}`,
);
const response = await getNotificationStatus();
this.logger.debug(`Updating result of notification with id ${notification.id}`);
notification.result = { result: response.result as Record<string, unknown> };
notification.deliveryStatus = response.deliveryStatus;

Expand Down Expand Up @@ -113,6 +134,9 @@ export abstract class NotificationConsumer {
);
this.logger.error(JSON.stringify(error, ['message', 'stack'], 2));
} finally {
this.logger.debug(
`processAwaitingConfirmationNotificationQueue completed. Saving notification in DB: ${JSON.stringify(notification)}`,
);
await this.notificationRepository.save(notification);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,17 @@ export class NotificationQueueProducer {
) {}

async addNotificationToQueue(queueType: string, notification: Notification): Promise<void> {
this.logger.debug('Started addNotificationToQueue');
const provider = await this.providersService.getById(notification.providerId);
this.logger.debug(
`Fetched provider ${JSON.stringify(provider)} from notification ${JSON.stringify(notification)}`,
);
const queue = this.queueService.getOrCreateQueue(
queueType,
provider.channelType.toString(),
notification.providerId.toString(),
);
this.logger.debug(`Adding notification with id ${notification.id} to queue`);
await queue.add(notification.id.toString(), {
id: notification.id,
providerId: notification.providerId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ export class NotificationsController {
): Promise<Record<string, unknown>> {
try {
// ApiKeyGuard checks if requested providerId is valid, correct channelType and applicationId present
this.logger.debug(`Notification Request Data: ${JSON.stringify(notificationData)}`);
const createdNotification =
await this.notificationService.createNotification(notificationData);
this.logger.log('Notification created successfully.');
Expand Down
56 changes: 56 additions & 0 deletions apps/api/src/modules/notifications/notifications.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ export class NotificationsService extends CoreService<Notification> {
// Set correct application name using applicationId
notification.createdBy = await this.getApplicationNameFromId(notification.applicationId);
notification.updatedBy = await this.getApplicationNameFromId(notification.applicationId);
this.logger.debug(
`New Notification created. Saving notification in DB: ${JSON.stringify(notification)}`,
);
return this.notificationRepository.save(notification);
}

Expand Down Expand Up @@ -89,12 +92,17 @@ export class NotificationsService extends CoreService<Notification> {
async addNotificationsToQueue(): Promise<void> {
this.logger.log('Starting CRON job to add pending notifications to queue');

this.logger.debug(`isProcessingQueue value: ${this.isProcessingQueue}`);

if (this.isProcessingQueue) {
this.logger.log('Notifications are already being added to queue, skipping this CRON job');
return;
}

this.isProcessingQueue = true;
this.logger.debug(
`isProcessingQueue value before initializing allPendingNotifications: ${this.isProcessingQueue}`,
);
let allPendingNotifications: Notification[] = [];

try {
Expand All @@ -103,34 +111,57 @@ export class NotificationsService extends CoreService<Notification> {
this.isProcessingQueue = false;
this.logger.error('Error fetching pending notifications');
this.logger.error(JSON.stringify(error, null, 2));
this.logger.debug(
`isProcessingQueue value when error fetching pending notifications: ${this.isProcessingQueue}`,
);

return;
}

this.logger.log(`Adding ${allPendingNotifications.length} pending notifications to queue`);

for (const notification of allPendingNotifications) {
try {
this.logger.debug(
`NotificationId: ${notification.id}, DeliveryStatus: ${notification.deliveryStatus}`,
);
notification.deliveryStatus = DeliveryStatus.IN_PROGRESS;
this.logger.debug(
`Updated deliveryStatus to ${DeliveryStatus.IN_PROGRESS}. Saving notification in DB: ${JSON.stringify(notification)}`,
);
await this.notificationRepository.save(notification);
await this.notificationQueueService.addNotificationToQueue(QueueAction.SEND, notification);
} catch (error) {
this.logger.debug(
`Error encountered. NotificationId: ${notification.id}, DeliveryStatus: ${notification.deliveryStatus}`,
);
notification.deliveryStatus = DeliveryStatus.PENDING;
this.logger.debug(`Updating result of notification with id ${notification.id}`);
notification.result = { result: { message: error.message, stack: error.stack } };
this.logger.error(`Error adding notification with id: ${notification.id} to queue`);
this.logger.error(JSON.stringify(error, null, 2));
this.logger.debug(
`isProcessingQueue value while adding notification with id: ${notification.id} to queue: ${this.isProcessingQueue}`,
);
} finally {
this.logger.debug(`Saving notification in DB: ${JSON.stringify(notification)}`);
await this.notificationRepository.save(notification);
}
}

this.isProcessingQueue = false;
this.logger.debug(
`isProcessingQueue value after adding ${allPendingNotifications.length} pending notifications to queue: ${this.isProcessingQueue}`,
);
}

async getProviderConfirmation(): Promise<void> {
this.logger.log(
'Starting CRON job to add notifications to queue for confirmation from provider',
);

this.logger.debug(`isProcessingConfirmationQueue value: ${this.isProcessingConfirmationQueue}`);

if (this.isProcessingConfirmationQueue) {
this.logger.log(
'Notifications are already being added to confirmation queue, skipping this CRON job',
Expand All @@ -139,6 +170,9 @@ export class NotificationsService extends CoreService<Notification> {
}

this.isProcessingConfirmationQueue = true;
this.logger.debug(
`isProcessingConfirmationQueue value before initializing allAwaitingConfirmationNotifications: ${this.isProcessingConfirmationQueue}`,
);
let allAwaitingConfirmationNotifications: Notification[] = [];

try {
Expand All @@ -147,6 +181,9 @@ export class NotificationsService extends CoreService<Notification> {
this.isProcessingConfirmationQueue = false;
this.logger.error('Error fetching awaiting confirmation notifications');
this.logger.error(JSON.stringify(error, null, 2));
this.logger.debug(
`isProcessingConfirmationQueue value when error fetching awaiting confirmation notifications: ${this.isProcessingConfirmationQueue}`,
);
return;
}

Expand All @@ -156,21 +193,39 @@ export class NotificationsService extends CoreService<Notification> {

for (const notification of allAwaitingConfirmationNotifications) {
try {
this.logger.debug(
`NotificationId: ${notification.id}, DeliveryStatus: ${notification.deliveryStatus}`,
);
notification.deliveryStatus = DeliveryStatus.QUEUED_CONFIRMATION;
this.logger.debug(
`Updated deliveryStatus to ${DeliveryStatus.QUEUED_CONFIRMATION}. Saving notification in DB: ${JSON.stringify(notification)}`,
);
await this.notificationRepository.save(notification);
await this.notificationQueueService.addNotificationToQueue(
QueueAction.DELIVERY_STATUS,
notification,
);
} catch (error) {
this.logger.debug(
`Error encountered. NotificationId: ${notification.id}, DeliveryStatus: ${notification.deliveryStatus}`,
);
notification.deliveryStatus = DeliveryStatus.AWAITING_CONFIRMATION;
this.logger.error(`Error adding notification with id: ${notification.id} to queue`);
this.logger.error(JSON.stringify(error, null, 2));
this.logger.debug(
`isProcessingConfirmationQueue value while adding notification with id: ${notification.id} to queue: ${this.isProcessingConfirmationQueue}`,
);
this.logger.debug(
`Updated Delivery status to ${DeliveryStatus.AWAITING_CONFIRMATION}. Saving notification in DB: ${JSON.stringify(notification)}`,
);
await this.notificationRepository.save(notification);
}
}

this.isProcessingConfirmationQueue = false;
this.logger.debug(
`isProcessingConfirmationQueue value after adding ${allAwaitingConfirmationNotifications.length} awaiting confirmation notifications to queue: ${this.isProcessingConfirmationQueue}`,
);
}

getPendingNotifications(): Promise<Notification[]> {
Expand Down Expand Up @@ -211,6 +266,7 @@ export class NotificationsService extends CoreService<Notification> {

// Get the applicationId currently being used for filtering data based on api key
const filterApplicationId = await this.getApplicationIdFromApiKey(authorizationHeader);
this.logger.debug(`Fetch notifications with applicationId: ${filterApplicationId}`);

const baseConditions = [
{ field: 'status', value: Status.ACTIVE },
Expand Down
Loading