Skip to content

Commit 40c6a83

Browse files
committed
feat(notifications): upgrades pg-boss
1 parent 95fc847 commit 40c6a83

File tree

7 files changed

+25
-36
lines changed

7 files changed

+25
-36
lines changed

apps/api/src/core/services/job-queue/job-queue.service.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ export class JobQueueService implements Disposable {
3737
}
3838
seenJobs.add(queueName);
3939
await this.pgBoss.createQueue(queueName, {
40-
name: queueName,
4140
retryLimit: 5,
4241
retryBackoff: true,
4342
retryDelayMax: 5 * 60

apps/notifications/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@
7373
"murmurhash": "^2.0.1",
7474
"nestjs-zod": "^4.3.1",
7575
"pg": "^8.13.0",
76-
"pg-boss": "^10.1.6",
76+
"pg-boss": "^11.0.4",
7777
"pino": "^9.7.0",
7878
"reflect-metadata": "^0.2.2",
7979
"rxjs": "^7.8.1",

apps/notifications/src/infrastructure/broker/providers/pg-boss/pg-boss.provider.spec.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ describe("createPgBoss", () => {
2626
expect(MockPgBoss).toHaveBeenCalledTimes(2);
2727
expect(MockPgBoss.mock.calls[0][0]).toEqual(config["broker.EVENT_BROKER_POSTGRES_URI"]);
2828
expect(MockPgBoss.mock.calls[1][0]).toEqual({
29-
archiveCompletedAfterSeconds: config["broker.EVENT_BROKER_ARCHIVE_COMPLETED_AFTER_SECONDS"],
3029
db: {
3130
executeSql: expect.any(Function)
3231
}

apps/notifications/src/infrastructure/broker/providers/pg-boss/pg-boss.provider.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,7 @@ export const createPgBossFactory =
1919
executeSql(text: string, values: any[]): Promise<{ rows: any[] }> {
2020
return client.query(text, values);
2121
}
22-
},
23-
archiveCompletedAfterSeconds: config.getOrThrow("broker.EVENT_BROKER_ARCHIVE_COMPLETED_AFTER_SECONDS")
22+
}
2423
});
2524

2625
const logger = new LoggerService({ context: "PgBoss" });

apps/notifications/src/infrastructure/broker/services/broker/broker.service.spec.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,9 @@ describe(BrokerService.name, () => {
4646

4747
await service.subscribe(eventName, options, handler);
4848

49-
expect(pgBoss.createQueue).toHaveBeenCalledWith(queueName);
49+
expect(pgBoss.createQueue).toHaveBeenCalledWith(queueName, {
50+
deleteAfterSeconds: configService.getOrThrow("broker.EVENT_BROKER_ARCHIVE_COMPLETED_AFTER_SECONDS")
51+
});
5052
expect(pgBoss.subscribe).toHaveBeenCalledWith(eventName, queueName);
5153
expect(pgBoss.work).toHaveBeenCalledTimes(options.prefetchCount);
5254
expect(pgBoss.work).toHaveBeenCalledWith(queueName, expect.any(Function));

apps/notifications/src/infrastructure/broker/services/broker/broker.service.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,9 @@ export class BrokerService {
3636

3737
async subscribe<ReqData>(eventName: string, options: { prefetchCount: number }, handler: SingleMsgWorkHandler<ReqData>) {
3838
const queueName = this.toQueueName(eventName);
39-
await this.boss.createQueue(queueName);
39+
await this.boss.createQueue(queueName, {
40+
deleteAfterSeconds: this.configService.getOrThrow("broker.EVENT_BROKER_ARCHIVE_COMPLETED_AFTER_SECONDS")
41+
});
4042
await this.boss.subscribe(eventName, queueName);
4143

4244
await Promise.all(

package-lock.json

Lines changed: 17 additions & 29 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)