Skip to content

Commit 729cd65

Browse files
authored
Merge pull request #234 from Plant-for-the-Planet-org/notification-batching
fixes issue with notification batching
2 parents 09a8615 + bdbb827 commit 729cd65

File tree

2 files changed

+38
-33
lines changed

2 files changed

+38
-33
lines changed

apps/server/src/Services/Notifications/SendNotifications.ts

Lines changed: 34 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,40 +1,41 @@
1-
import type {AdditionalOptions} from '../../Interfaces/AdditionalOptions';
2-
import {type AlertMethodMethod} from '../../Interfaces/AlertMethod';
1+
import type { AdditionalOptions } from '../../Interfaces/AdditionalOptions';
2+
import { type AlertMethodMethod } from '../../Interfaces/AlertMethod';
33
import type DataRecord from '../../Interfaces/DataRecord';
4-
import {type NotificationParameters} from '../../Interfaces/NotificationParameters';
5-
import {getLocalTime} from '../../../src/utils/date';
6-
import {env} from '../../env.mjs';
7-
import {prisma} from '../../server/db';
8-
import {logger} from '../../server/logger';
4+
import { type NotificationParameters } from '../../Interfaces/NotificationParameters';
5+
import { getLocalTime } from '../../../src/utils/date';
6+
import { env } from '../../env.mjs';
7+
import { prisma } from '../../server/db';
8+
import { logger } from '../../server/logger';
99
import NotifierRegistry from '../Notifier/NotifierRegistry';
10-
import {NOTIFICATION_METHOD} from '../Notifier/methodConstants';
10+
import { NOTIFICATION_METHOD } from '../Notifier/methodConstants';
1111

12-
const MAX_RETRIES = 0;
12+
// Removed MAX_RETRIES - we now process all batches until no more notifications remain
1313
const ALERT_SMS_DISABLED = env.ALERT_SMS_DISABLED;
1414
const ALERT_WHATSAPP_DISABLED = env.ALERT_WHATSAPP_DISABLED;
1515

1616
// get all undelivered Notifications and using relation from SiteAlert, get the data on Site
1717
// for each notification, send the notification to the destination
1818
// After sending notification update the notification table to set isDelivered to true and sentAt to current time
1919
// If notification fails to send, increment the failCount in all alertMethods table where destination and method match.
20-
const sendNotifications = async ({req}: AdditionalOptions): Promise<number> => {
20+
const sendNotifications = async ({ req }: AdditionalOptions): Promise<number> => {
2121
const alertMethodsExclusionList = [];
2222
if (ALERT_SMS_DISABLED)
2323
alertMethodsExclusionList.push(NOTIFICATION_METHOD.SMS);
2424
if (ALERT_WHATSAPP_DISABLED)
2525
alertMethodsExclusionList.push(NOTIFICATION_METHOD.WHATSAPP);
2626

27-
const take = 10;
27+
const BATCH_SIZE = parseInt(env.NOTIFICATION_BATCH_SIZE || '10', 10);
28+
const take = BATCH_SIZE;
2829
let successCount = 0;
2930
let continueProcessing = true;
30-
let retries = 0;
31+
let batchCount = 0;
3132
while (continueProcessing) {
3233
const notifications = await prisma.notification.findMany({
3334
where: {
3435
isSkipped: false,
3536
isDelivered: false,
3637
sentAt: null,
37-
alertMethod: {notIn: alertMethodsExclusionList},
38+
alertMethod: { notIn: alertMethodsExclusionList },
3839
},
3940
include: {
4041
siteAlert: {
@@ -64,7 +65,7 @@ const sendNotifications = async ({req}: AdditionalOptions): Promise<number> => {
6465
await Promise.all(
6566
notifications.map(async notification => {
6667
try {
67-
const {id, destination, siteAlert} = notification;
68+
const { id, destination, siteAlert } = notification;
6869
const alertMethod = notification.alertMethod as AlertMethodMethod;
6970
const {
7071
id: alertId,
@@ -173,20 +174,19 @@ const sendNotifications = async ({req}: AdditionalOptions): Promise<number> => {
173174
const isDelivered = await notifier.notify(
174175
destination,
175176
notificationParameters,
176-
{req},
177+
{ req },
177178
);
178179

179180
if (isDelivered === true) {
180181
successfulNotificationIds.push(id);
181182
successfulDestinations.push(destination);
182183
successCount++;
183184
} else {
184-
failedAlertMethods.push({destination, method: alertMethod});
185+
failedAlertMethods.push({ destination, method: alertMethod });
185186
}
186187
} catch (error) {
187188
logger(
188-
`Error processing notification ${notification.id}: ${
189-
(error as Error)?.message
189+
`Error processing notification ${notification.id}: ${(error as Error)?.message
190190
}`,
191191
'error',
192192
);
@@ -197,34 +197,35 @@ const sendNotifications = async ({req}: AdditionalOptions): Promise<number> => {
197197
// UpdateMany notification
198198
if (successfulNotificationIds.length > 0) {
199199
await prisma.notification.updateMany({
200-
where: {id: {in: successfulNotificationIds}},
201-
data: {isDelivered: true, sentAt: new Date()},
200+
where: { id: { in: successfulNotificationIds } },
201+
data: { isDelivered: true, sentAt: new Date() },
202202
});
203203
await prisma.alertMethod.updateMany({
204-
where: {destination: {in: successfulDestinations}},
205-
data: {failCount: 0},
204+
where: { destination: { in: successfulDestinations } },
205+
data: { failCount: 0 },
206206
});
207207
}
208208

209-
retries += 1;
210-
if (retries >= MAX_RETRIES) {
211-
const unsuccessfulNotifications = notifications.filter(
212-
({id}) => !successfulNotificationIds.includes(id),
213-
);
209+
batchCount += 1;
210+
211+
// Handle failed notifications - mark them as skipped if they failed
212+
const unsuccessfulNotifications = notifications.filter(
213+
({ id }) => !successfulNotificationIds.includes(id),
214+
);
214215

216+
if (unsuccessfulNotifications.length > 0) {
215217
const unsuccessfulNotificationIds = unsuccessfulNotifications.map(
216-
({id}) => id,
218+
({ id }) => id,
217219
);
218220

219221
await prisma.notification.updateMany({
220-
where: {id: {in: unsuccessfulNotificationIds}},
221-
data: {isSkipped: true, isDelivered: false, sentAt: null},
222+
where: { id: { in: unsuccessfulNotificationIds } },
223+
data: { isSkipped: true, isDelivered: false, sentAt: null },
222224
});
223-
224-
continueProcessing = false;
225-
break;
226225
}
227226

227+
logger(`Completed batch ${batchCount}. Successful: ${successfulNotificationIds.length}, Failed: ${unsuccessfulNotifications.length}`, 'info');
228+
228229
// skip += take; No need to skip take as we update the notifications to isDelivered = true
229230
// wait .7 seconds before starting the next round to ensure we aren't hitting any rate limits.
230231
// Todo: make this configurable and adjust as needed.

apps/server/src/env.mjs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,9 @@ const server = z.object({
6060
ALERT_SMS_DISABLED: coerceBooleanWithDefault(true),
6161
ALERT_WHATSAPP_DISABLED: coerceBooleanWithDefault(true),
6262

63+
// Notification batch size for processing notifications in batches. Defaults to 10.
64+
NOTIFICATION_BATCH_SIZE: z.string().default('10'),
65+
6366
// API caching configuration. Enabled by default in production, disabled in development.
6467
PUBLIC_API_CACHING: z
6568
.union([z.literal('true'), z.literal('false')])
@@ -116,6 +119,7 @@ const processEnv = {
116119
ALERT_SMS_DISABLED: process.env.ALERT_SMS_DISABLED,
117120
ALERT_WHATSAPP_DISABLED: process.env.ALERT_WHATSAPP_DISABLED,
118121
PUBLIC_API_CACHING: process.env.PUBLIC_API_CACHING,
122+
NOTIFICATION_BATCH_SIZE: process.env.NOTIFICATION_BATCH_SIZE,
119123
};
120124

121125
// Don't touch the part below

0 commit comments

Comments
 (0)