Skip to content

Commit 52b3ad1

Browse files
committed
feat: task updates & jobId
1 parent 43f3a22 commit 52b3ad1

File tree

10 files changed

+52
-25
lines changed

10 files changed

+52
-25
lines changed

app/api/articles/[id]/similars/route.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import { randomUUID } from 'crypto';
12
import { NextRouteFunction } from '@/lib/route-validator.server';
23
import * as similarityService from '@/lib/article-metadata/similarity.server';
34
import * as redis from '@/lib/redis';
@@ -10,7 +11,10 @@ const generateEmbeddingAsync = redis.cacheableFunction<string, { id: string }>(
1011
redis.asyncEmbeddingGenerationSchema,
1112
{ ex: 60 * 60 }
1213
)(async (metadataId) =>
13-
generateEmbeddingsFromPdf.trigger({ id: metadataId }, { idempotencyKey: `similarity-${metadataId}` })
14+
generateEmbeddingsFromPdf.trigger(
15+
{ id: metadataId, jobId: randomUUID() },
16+
{ idempotencyKey: `similarity-${metadataId}` }
17+
)
1418
);
1519

1620
const getSimilarArticlesByMetadataId: NextRouteFunction<Params> = async (_, { params }) => {

trigger/ai/embedding-from-pdf.ts

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,17 @@ export const generateEmbeddingsFromPdf = task({
99
const payload = metadataIdPayloadSchema.parse(_payload);
1010
logger.info(`Generate Embedding from pdf - Id: ${payload.id}`, { time: new Date().toISOString() });
1111

12-
const pdfDocs = await loadPdf.triggerAndWait(payload.id);
12+
const pdfDocs = await loadPdf.triggerAndWait(payload.id, {
13+
idempotencyKey: `load-pdf-${payload.jobId}-${payload.id}`,
14+
});
1315

1416
if (!pdfDocs.ok || !pdfDocs.output) {
1517
throw new Error(`PDF not found for metadata id: ${payload.id}`);
1618
}
1719

1820
await generateEmbedding.trigger(
19-
{ itemId: payload.id, doc: pdfDocs.output.doc },
20-
{ idempotencyKey: `generate-metadata-embedding-${payload.id}` }
21+
{ itemId: payload.id, doc: pdfDocs.output.doc, jobId: payload.jobId },
22+
{ idempotencyKey: `generate-metadata-embedding-${payload.jobId}-${payload.id}` }
2123
);
2224

2325
logger.info(`Add Embeddings - Done`, { time: new Date().toISOString() });

trigger/ai/generate-ai-content.ts

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,26 @@
11
import { logger, task } from '@trigger.dev/sdk/v3';
22
import * as articleMetadataService from '@/lib/article-metadata/metadata.server';
3-
import { GenerateAiContent, generateAiContentSchema } from '../schema';
3+
import { GenerateAiContentPayload, generateAiContentPayloadSchema } from '../schema';
44
import { generateEmbeddingsFromPdf } from './embedding-from-pdf';
55

66
export const generateAIContent = task({
77
id: 'generate-ai-content',
8-
run: async (_payload: GenerateAiContent) => {
9-
const payload = generateAiContentSchema.parse(_payload);
8+
run: async (_payload: GenerateAiContentPayload) => {
9+
const payload = generateAiContentPayloadSchema.parse(_payload);
1010

1111
const metadataList = await articleMetadataService.getArticleMetadataIdsWithZeroEmbeddingsByIds(
12-
payload.map((payloadItem) => payloadItem.externalId)
12+
payload.data.map((payloadItem) => payloadItem.externalId)
1313
);
1414

1515
logger.info(`Fetched metadata count: ${metadataList.length} - Done`, {
1616
time: new Date().toISOString(),
1717
});
1818

1919
await generateEmbeddingsFromPdf.batchTrigger(
20-
metadataList.map((item) => ({ payload: { id: item.id, externalId: item.external_id } }))
20+
metadataList.map((item) => ({
21+
payload: { id: item.id, externalId: item.external_id, jobId: payload.jobId },
22+
options: { idempotencyKey: `generate-embedding-from-pdf-${payload.jobId}-${item.id}` },
23+
}))
2124
);
2225
},
2326
});

trigger/research-sync/add-article-metadata-batch.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,10 @@ export const addArticleMetadaBatch = task({
2020
return;
2121
}
2222

23-
await generateAIContent.trigger(newIds.map((id) => ({ externalId: id })));
23+
await generateAIContent.trigger(
24+
{ data: newIds.map((id) => ({ externalId: id })), jobId: payload.jobId },
25+
{ idempotencyKey: `generate-ai-content-${payload.jobId}-${payload.batchIndex}` }
26+
);
2427

2528
logger.info(`Add Article Metadata Batch - Done`, { time: new Date().toISOString() });
2629
},

trigger/research-sync/scheduler.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,8 @@ export const researchSync = schedules.task({
99
const startDate = new Date();
1010
startDate.setDate(startDate.getDate() - 1);
1111

12-
const jobId = randomUUID();
13-
1412
await syncMetadata.trigger({
13+
jobId: payload.externalId ?? randomUUID(),
1514
startDate: startDate.toISOString().split('T')[0],
1615
});
1716

@@ -23,6 +22,7 @@ export const researchSync = schedules.task({
2322

2423
export const researchSyncSchedule = schedules.create({
2524
task: researchSync.id,
25+
externalId: randomUUID(),
2626
deduplicationKey: 'research-sync-scheduler',
2727
// At minute 0 past every 6th hour
2828
cron: '0 */8 * * *',

trigger/research-sync/sync-metadata.ts

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,9 @@ export const syncMetadata = task({
6868
throw new Error(`Error fetching data from OAI-PMH server for ${payload.startDate} - ${payload.untilDate}`);
6969
}
7070

71-
const parsedData = await parseMetadata.triggerAndWait(responseTextData);
71+
const parsedData = await parseMetadata.triggerAndWait(responseTextData, {
72+
idempotencyKey: `parse-metadata-${payload.jobId}-${payload.startDate}-${payload.untilDate}`,
73+
});
7274

7375
if (!parsedData.ok) {
7476
throw new Error('Error parsing data');
@@ -83,7 +85,7 @@ export const syncMetadata = task({
8385
// send the batch to the add_article_metadata_batch event
8486
// no need to wait for the result
8587
await addArticleMetadaBatch.batchTrigger(
86-
batches.map((batch, index) => ({ payload: { batch, batchIndex: index } }))
88+
batches.map((batch, index) => ({ payload: { batch, batchIndex: index, jobId: payload.jobId } }))
8789
);
8890

8991
const token = getResumptionToken(parsedData.output.resumptionToken, parsedData.output.records.length);
@@ -93,11 +95,17 @@ export const syncMetadata = task({
9395
if (token) {
9496
// send the resumption token to the same event
9597
// no need to wait for the result
96-
await syncMetadata.trigger({
97-
startDate: payload.startDate,
98-
untilDate: payload.untilDate,
99-
resumptionToken: token,
100-
});
98+
await syncMetadata.trigger(
99+
{
100+
jobId: payload.jobId,
101+
startDate: payload.startDate,
102+
untilDate: payload.untilDate,
103+
resumptionToken: token,
104+
},
105+
{
106+
idempotencyKey: `sync-metadata-${payload.jobId}-${payload.startDate}-${payload.untilDate}`,
107+
}
108+
);
101109
}
102110
},
103111
});

trigger/schema/article-metadata.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,11 +46,13 @@ const metadataBatchSchema = z
4646
}));
4747

4848
export const addArticleMetadaBatchPayloadSchema = z.object({
49+
jobId: z.string(),
4950
batchIndex: z.number(),
5051
batch: z.array(metadataBatchSchema),
5152
});
5253

5354
export const metadataIdPayloadSchema = z.object({
55+
jobId: z.string(),
5456
id: z.string(),
5557
});
5658

trigger/schema/generate-ai-content.ts

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
import { z } from 'zod';
22

3-
export const generateAiContentSchema = z.array(
4-
z.object({
5-
externalId: z.string(),
6-
})
7-
);
3+
export const generateAiContentPayloadSchema = z.object({
4+
jobId: z.string(),
5+
data: z.array(
6+
z.object({
7+
externalId: z.string(),
8+
})
9+
),
10+
});
811

9-
export type GenerateAiContent = z.input<typeof generateAiContentSchema>;
12+
export type GenerateAiContentPayload = z.input<typeof generateAiContentPayloadSchema>;

trigger/schema/generate-embedding.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { z } from 'zod';
22

33
export const generateEmbeddingPayloadSchema = z.object({
4+
jobId: z.string(),
45
itemId: z.string(),
56
// Document<PDFMetadata<{ metadata_id: string }>>[]
67
doc: z.array(

trigger/schema/research-sync.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { z } from 'zod';
22

33
export const researchSyncPayloadSchema = z.object({
4+
jobId: z.string(),
45
startDate: z.string().regex(/^[0-9]{4}-[0-9]{2}-[0-9]{2}$/),
56
untilDate: z
67
.string()

0 commit comments

Comments
 (0)