Skip to content

Commit a6bf3f2

Browse files
Merge pull request #144 from Plant-for-the-Planet-org/feature/improvements-on-geo-event-fetcher
Scaling and Improving Serverless Functions
2 parents a79a369 + 0d775cb commit a6bf3f2

File tree

5 files changed

+274
-179
lines changed

5 files changed

+274
-179
lines changed

apps/server/src/Services/Notifications/CreateNotifications.ts

Lines changed: 179 additions & 125 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import {prisma} from '../../server/db';
2+
import {CustomError} from '../../utils/errorHandler';
23

34
// Logic in Prisma:
45
// Initialize an empty array → `notificationDataQueue` -> Queue for holding data required for conditional notification creation
@@ -63,6 +64,139 @@ type ActiveAlertMethodCount = {
6364
[key: string]: number;
6465
};
6566

67+
type UnprocessedSiteAlert = {
68+
id: string;
69+
siteId: string;
70+
site: {
71+
lastMessageCreated: Date | null;
72+
user: {
73+
alertMethods: {
74+
method: string;
75+
destination: string;
76+
isEnabled: boolean;
77+
isVerified: boolean;
78+
}[];
79+
};
80+
};
81+
};
82+
83+
84+
function createNestedChunksForUnprocessedSiteAlerts(unprocessedSiteAlerts:UnprocessedSiteAlert[], approxChunkSize: number = 50){
85+
const chunkedSiteAlertsNestedArray: UnprocessedSiteAlert[][] = [];
86+
let currentSiteId = unprocessedSiteAlerts[0]?.siteId;
87+
let nestedIndex = 0;
88+
89+
unprocessedSiteAlerts.forEach((siteAlert) => {
90+
if (!chunkedSiteAlertsNestedArray[nestedIndex]) {
91+
chunkedSiteAlertsNestedArray[nestedIndex] = [];
92+
}
93+
94+
const currentChunk = chunkedSiteAlertsNestedArray[nestedIndex];
95+
const isNewSiteId = siteAlert.siteId !== currentSiteId;
96+
const chunkFull = currentChunk.length >= approxChunkSize
97+
const shouldCreateNewChunk = chunkFull && isNewSiteId;
98+
99+
if (shouldCreateNewChunk) {
100+
nestedIndex++;
101+
currentSiteId = siteAlert.siteId;
102+
chunkedSiteAlertsNestedArray[nestedIndex] = [siteAlert];
103+
} else {
104+
currentChunk.push(siteAlert);
105+
}
106+
});
107+
return chunkedSiteAlertsNestedArray
108+
}
109+
110+
async function processSiteAlertChunk(siteAlertChunk:UnprocessedSiteAlert[]){
111+
let notificationDataQueue: NotificationToBeProcessed[] = [];
112+
// [email, whatsapp, and sms] method have one count for each unique alertMethod,
113+
// notificationMethodCounter keeps count for eligible notifications for each alertMethod method
114+
let notificationMethodCounter: Record<string, ActiveAlertMethodCount> = {};
115+
let processedSiteAlerts: string[] = [];
116+
let notificationsToBeCreated: NotificationToBeCreated[] = [];
117+
let sitesToBeUpdated: string[] = [];
118+
119+
// Create a set of unique site IDs from unprocessed alerts
120+
const uniqueSiteIdsForNewSiteAlerts = new Set(siteAlertChunk.map(alert => alert.siteId));
121+
122+
// Process each siteAlert, and add to notificationQueue
123+
for (const siteAlert of siteAlertChunk) {
124+
const lastMessageCreated = siteAlert.site.lastMessageCreated;
125+
const alertMethods = siteAlert.site.user.alertMethods;
126+
const siteId = siteAlert.siteId;
127+
128+
// Initialize or update notificationMethodCounter for each unique site
129+
if (uniqueSiteIdsForNewSiteAlerts.has(siteId)) {
130+
// Initialize notificationMethodCounter for this site
131+
notificationMethodCounter[siteId] = { sms: 0, whatsapp: 0, email: 0, device: Infinity, webhook: Infinity };
132+
alertMethods.forEach(method => {
133+
if (method.isVerified && method.isEnabled) {
134+
notificationMethodCounter[siteId][method.method] += 1; // Increment the count of each method
135+
}
136+
});
137+
// Remove the siteId from the set
138+
uniqueSiteIdsForNewSiteAlerts.delete(siteId);
139+
}
140+
141+
if (alertMethods && alertMethods.length > 0) {
142+
alertMethods.forEach(alertMethod => {
143+
if (alertMethod.isVerified && alertMethod.isEnabled) {
144+
notificationDataQueue.push({
145+
siteAlertId: siteAlert.id,
146+
siteId: siteAlert.siteId,
147+
lastMessageCreated,
148+
alertMethod: alertMethod.method,
149+
destination: alertMethod.destination
150+
});
151+
}
152+
});
153+
}
154+
processedSiteAlerts.push(siteAlert.id);
155+
}
156+
157+
// Create notifications based on conditions
158+
for (const notification of notificationDataQueue) {
159+
const siteId = notification.siteId;
160+
const method = notification.alertMethod
161+
162+
// Determine if notification can be created
163+
// Check if the site is active or not, then check if the method count is sufficient
164+
const isSiteActive = !notification.lastMessageCreated || new Date(notification.lastMessageCreated) < new Date(Date.now() - 2 * 60 * 60 * 1000);
165+
const isMethodCountSufficient = notificationMethodCounter[siteId][method] > 0;
166+
const canCreateNotification = isSiteActive && isMethodCountSufficient;
167+
168+
if (canCreateNotification) {
169+
// Prepare createNotificationData object
170+
const createNotificationData: NotificationToBeCreated = {
171+
siteAlertId: notification.siteAlertId,
172+
alertMethod: method,
173+
destination: notification.destination,
174+
isDelivered: false,
175+
};
176+
177+
// Add to notificationsToBeCreated array
178+
notificationsToBeCreated.push(createNotificationData);
179+
180+
// Decrement the method count
181+
notificationMethodCounter[siteId][method] -= 1;
182+
183+
// Add siteId to sitesToBeUpdated for methods other than device and webhook
184+
if (['sms', 'whatsapp', 'email'].includes(method) && !sitesToBeUpdated.includes(siteId)) {
185+
sitesToBeUpdated.push(siteId);
186+
}
187+
}
188+
}
189+
const notificationCreateData = notificationsToBeCreated.map(n => ({
190+
siteAlertId: n.siteAlertId,
191+
alertMethod: n.alertMethod,
192+
destination: n.destination,
193+
isDelivered: false
194+
}))
195+
return {
196+
processedSiteAlerts, notificationCreateData, sitesToBeUpdated
197+
}
198+
}
199+
66200
//For all siteAlerts
67201
// Create notifications for [webhook, device]
68202
// We check if alerts happen within a certain time
@@ -73,141 +207,61 @@ type ActiveAlertMethodCount = {
73207
const createNotifications = async () => {
74208
let totalNotificationsCreated = 0;
75209
try {
76-
77-
// Run Prisma Transaction
78-
await prisma.$transaction(async (prisma) => {
79-
80-
//Get all unprocessed alerts
81-
const unprocessedAlerts = await prisma.siteAlert.findMany({
82-
where: { isProcessed: false, deletedAt: null },
83-
select: {
84-
id: true,
85-
siteId: true,
86-
site: {
87-
select: {
88-
lastMessageCreated: true,
89-
user: {
90-
select: {
91-
alertMethods: {
92-
select: {
93-
method: true,
94-
destination: true,
95-
isEnabled: true,
96-
isVerified: true
97-
}
210+
//Get all unprocessed alerts
211+
const unprocessedAlerts = await prisma.siteAlert.findMany({
212+
where: {
213+
isProcessed: false,
214+
deletedAt: null,
215+
eventDate: {
216+
gte: new Date(new Date().getTime() - 24 * 60 * 60 * 1000), // Alerts from the last 24 hours
217+
},
218+
},
219+
select: {
220+
id: true,
221+
siteId: true,
222+
site: {
223+
select: {
224+
lastMessageCreated: true,
225+
user: {
226+
select: {
227+
alertMethods: {
228+
select: {
229+
method: true,
230+
destination: true,
231+
isEnabled: true,
232+
isVerified: true
98233
}
99234
}
100235
}
101236
}
102237
}
103-
},
104-
orderBy: [{ siteId: 'asc' }, { eventDate: 'asc' }]
105-
});
106-
107-
let notificationDataQueue: NotificationToBeProcessed[] = [];
108-
// [email, whatsapp, and sms] method have one count for each unique alertMethod,
109-
// notificationMethodCounter keeps count for eligible notifications for each alertMethod method
110-
let notificationMethodCounter: Record<string, ActiveAlertMethodCount> = {};
111-
let processedSiteAlerts: string[] = [];
112-
let notificationsToBeCreated: NotificationToBeCreated[] = [];
113-
let sitesToBeUpdated: string[] = [];
114-
115-
// Create a set of unique site IDs from unprocessed alerts
116-
const uniqueSiteIdsForNewSiteAlerts = new Set(unprocessedAlerts.map(alert => alert.siteId));
117-
118-
// Process each siteAlert
119-
for (const siteAlert of unprocessedAlerts) {
120-
const lastMessageCreated = siteAlert.site.lastMessageCreated;
121-
const alertMethods = siteAlert.site.user.alertMethods;
122-
const siteId = siteAlert.siteId;
123-
124-
// Initialize or update notificationMethodCounter for each unique site
125-
if (uniqueSiteIdsForNewSiteAlerts.has(siteId)) {
126-
// Initialize notificationMethodCounter for this site
127-
notificationMethodCounter[siteId] = { sms: 0, whatsapp: 0, email: 0, device: Infinity, webhook: Infinity };
128-
alertMethods.forEach(method => {
129-
if (method.isVerified && method.isEnabled) {
130-
notificationMethodCounter[siteId][method.method] += 1; // Increment the count of each method
131-
}
132-
});
133-
// Remove the siteId from the set
134-
uniqueSiteIdsForNewSiteAlerts.delete(siteId);
135238
}
136-
137-
if (alertMethods && alertMethods.length > 0) {
138-
alertMethods.forEach(alertMethod => {
139-
if (alertMethod.isVerified && alertMethod.isEnabled) {
140-
notificationDataQueue.push({
141-
siteAlertId: siteAlert.id,
142-
siteId: siteAlert.siteId,
143-
lastMessageCreated,
144-
alertMethod: alertMethod.method,
145-
destination: alertMethod.destination
146-
});
147-
}
239+
},
240+
orderBy: [{ siteId: 'asc' }, { eventDate: 'asc' }]
241+
});
242+
const siteAlertsInChunks = createNestedChunksForUnprocessedSiteAlerts(unprocessedAlerts, 30)
243+
for (const siteAlertChunk of siteAlertsInChunks) {
244+
const {processedSiteAlerts, notificationCreateData, sitesToBeUpdated} = await processSiteAlertChunk(siteAlertChunk)
245+
await prisma.$transaction(async (prisma) => {
246+
await prisma.siteAlert.updateMany({
247+
where: { id: { in: processedSiteAlerts } },
248+
data: { isProcessed: true }
249+
});
250+
await prisma.notification.createMany({
251+
data: notificationCreateData
252+
});
253+
if (sitesToBeUpdated.length > 0) {
254+
await prisma.site.updateMany({
255+
where: { id: { in: sitesToBeUpdated } },
256+
data: { lastMessageCreated: new Date() }
148257
});
149258
}
150-
processedSiteAlerts.push(siteAlert.id);
151-
}
152-
153-
// Create notifications based on conditions
154-
for (const notification of notificationDataQueue) {
155-
const siteId = notification.siteId;
156-
const method = notification.alertMethod
157-
158-
// Determine if notification can be created
159-
// Check if the site is active or not, then check if the method count is sufficient
160-
const isSiteActive = !notification.lastMessageCreated || new Date(notification.lastMessageCreated) < new Date(Date.now() - 2 * 60 * 60 * 1000);
161-
const isMethodCountSufficient = notificationMethodCounter[siteId][method] > 0;
162-
const canCreateNotification = isSiteActive && isMethodCountSufficient;
163-
164-
if (canCreateNotification) {
165-
// Prepare createNotificationData object
166-
const createNotificationData: NotificationToBeCreated = {
167-
siteAlertId: notification.siteAlertId,
168-
alertMethod: method,
169-
destination: notification.destination,
170-
isDelivered: false,
171-
};
172-
173-
// Add to notificationsToBeCreated array
174-
notificationsToBeCreated.push(createNotificationData);
175-
176-
// Decrement the method count
177-
notificationMethodCounter[siteId][method] -= 1;
178-
179-
// Add siteId to sitesToBeUpdated for methods other than device and webhook
180-
if (['sms', 'whatsapp', 'email'].includes(method) && !sitesToBeUpdated.includes(siteId)) {
181-
sitesToBeUpdated.push(siteId);
182-
}
183-
}
184-
}
185-
// Bulk update siteAlert to processed
186-
await prisma.siteAlert.updateMany({
187-
where: { id: { in: processedSiteAlerts } },
188-
data: { isProcessed: true }
189-
});
190-
// Bulk create notifications
191-
await prisma.notification.createMany({
192-
data: notificationsToBeCreated.map(n => ({
193-
siteAlertId: n.siteAlertId,
194-
alertMethod: n.alertMethod,
195-
destination: n.destination,
196-
isDelivered: false
197-
}))
198259
});
199-
// Bulk update sites.lastMessageCreated
200-
if (sitesToBeUpdated.length > 0) {
201-
await prisma.site.updateMany({
202-
where: { id: { in: sitesToBeUpdated } },
203-
data: { lastMessageCreated: new Date() }
204-
});
205-
}
206-
// Update totalNotificationsCreated
207-
totalNotificationsCreated = notificationsToBeCreated.length
208-
});
260+
totalNotificationsCreated = totalNotificationsCreated + notificationCreateData.length
261+
}
209262
} catch (error) {
210-
console.log(error);
263+
const { status, message } = error as { status: number; message: string };
264+
CustomError.throw(status, message, 'GeneralError',true);
211265
}
212266
return totalNotificationsCreated;
213267
};

0 commit comments

Comments
 (0)