Skip to content

Commit dcebe3a

Browse files
authored
improvement(triggers): add tags to all trigger.dev task invocations (#3878)
* improvement(triggers): add tags to all trigger.dev task invocations * fix(triggers): prefix unused type param in buildTags * fix(triggers): remove unused type param from buildTags
1 parent e39c534 commit dcebe3a

File tree

12 files changed

+98
-17
lines changed

12 files changed

+98
-17
lines changed

apps/sim/app/api/schedules/execute/route.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,11 @@ export async function GET(request: NextRequest) {
146146
})
147147
} else {
148148
jobId = await jobQueue.enqueue('schedule-execution', payload, {
149-
metadata: { workflowId: schedule.workflowId ?? undefined, correlation },
149+
metadata: {
150+
workflowId: schedule.workflowId ?? undefined,
151+
workspaceId: resolvedWorkspaceId ?? undefined,
152+
correlation,
153+
},
150154
})
151155
}
152156
logger.info(

apps/sim/app/api/webhooks/agentmail/route.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,13 @@ export async function POST(req: Request) {
162162

163163
if (isTriggerDevEnabled) {
164164
try {
165-
const handle = await tasks.trigger('mothership-inbox-execution', { taskId })
165+
const handle = await tasks.trigger(
166+
'mothership-inbox-execution',
167+
{ taskId },
168+
{
169+
tags: [`workspaceId:${result.id}`, `taskId:${taskId}`],
170+
}
171+
)
166172
await db
167173
.update(mothershipInboxTask)
168174
.set({ triggerJobId: handle.id })

apps/sim/app/api/workflows/[id]/execute/route.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,7 @@ async function handleAsyncExecution(params: AsyncExecutionParams): Promise<NextR
237237
},
238238
})
239239
: await jobQueue!.enqueue('workflow-execution', payload, {
240-
metadata: { workflowId, userId, correlation },
240+
metadata: { workflowId, workspaceId, userId, correlation },
241241
})
242242

243243
asyncLogger.info('Queued async workflow execution', { jobId })

apps/sim/app/workspace/[workspaceId]/knowledge/components/edit-knowledge-base-modal/edit-knowledge-base-modal.tsx

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -146,34 +146,34 @@ export const EditKnowledgeBaseModal = memo(function EditKnowledgeBaseModal({
146146
<Label>Chunking Configuration</Label>
147147
<div className='grid grid-cols-3 gap-2'>
148148
<div className='rounded-sm border border-[var(--border-1)] bg-[var(--surface-2)] px-2.5 py-2'>
149-
<p className='text-[var(--text-tertiary)] text-[11px] leading-tight'>
149+
<p className='text-[11px] text-[var(--text-tertiary)] leading-tight'>
150150
Max Size
151151
</p>
152152
<p className='font-medium text-[var(--text-primary)] text-sm'>
153153
{chunkingConfig.maxSize.toLocaleString()}
154-
<span className='ml-0.5 font-normal text-[var(--text-tertiary)] text-[11px]'>
154+
<span className='ml-0.5 font-normal text-[11px] text-[var(--text-tertiary)]'>
155155
tokens
156156
</span>
157157
</p>
158158
</div>
159159
<div className='rounded-sm border border-[var(--border-1)] bg-[var(--surface-2)] px-2.5 py-2'>
160-
<p className='text-[var(--text-tertiary)] text-[11px] leading-tight'>
160+
<p className='text-[11px] text-[var(--text-tertiary)] leading-tight'>
161161
Min Size
162162
</p>
163163
<p className='font-medium text-[var(--text-primary)] text-sm'>
164164
{chunkingConfig.minSize.toLocaleString()}
165-
<span className='ml-0.5 font-normal text-[var(--text-tertiary)] text-[11px]'>
165+
<span className='ml-0.5 font-normal text-[11px] text-[var(--text-tertiary)]'>
166166
chars
167167
</span>
168168
</p>
169169
</div>
170170
<div className='rounded-sm border border-[var(--border-1)] bg-[var(--surface-2)] px-2.5 py-2'>
171-
<p className='text-[var(--text-tertiary)] text-[11px] leading-tight'>
171+
<p className='text-[11px] text-[var(--text-tertiary)] leading-tight'>
172172
Overlap
173173
</p>
174174
<p className='font-medium text-[var(--text-primary)] text-sm'>
175175
{chunkingConfig.overlap.toLocaleString()}
176-
<span className='ml-0.5 font-normal text-[var(--text-tertiary)] text-[11px]'>
176+
<span className='ml-0.5 font-normal text-[11px] text-[var(--text-tertiary)]'>
177177
tokens
178178
</span>
179179
</p>

apps/sim/lib/a2a/push-notifications.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,12 @@ export async function notifyTaskStateChange(taskId: string, state: TaskState): P
114114
const { a2aPushNotificationTask } = await import(
115115
'@/background/a2a-push-notification-delivery'
116116
)
117-
await a2aPushNotificationTask.trigger({ taskId, state })
117+
await a2aPushNotificationTask.trigger(
118+
{ taskId, state },
119+
{
120+
tags: [`taskId:${taskId}`],
121+
}
122+
)
118123
logger.info('Push notification queued to trigger.dev', { taskId, state })
119124
} catch (error) {
120125
logger.warn('Failed to queue push notification, falling back to inline delivery', {

apps/sim/lib/core/async-jobs/backends/trigger-dev.ts

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,9 +66,10 @@ export class TriggerDevJobQueue implements JobQueueBackend {
6666
? { ...payload, ...options.metadata }
6767
: payload
6868

69-
const handle = await tasks.trigger(taskId, enrichedPayload)
69+
const tags = buildTags(options)
70+
const handle = await tasks.trigger(taskId, enrichedPayload, tags.length > 0 ? { tags } : {})
7071

71-
logger.debug('Enqueued job via trigger.dev', { jobId: handle.id, type, taskId })
72+
logger.debug('Enqueued job via trigger.dev', { jobId: handle.id, type, taskId, tags })
7273
return handle.id
7374
}
7475

@@ -121,3 +122,33 @@ export class TriggerDevJobQueue implements JobQueueBackend {
121122

122123
async markJobFailed(_jobId: string, _error: string): Promise<void> {}
123124
}
125+
126+
/**
127+
* Derives trigger.dev tags from job type, metadata, and explicit tags.
128+
* Tags follow the `namespace:value` convention for consistent filtering.
129+
* Max 10 tags per run, each max 128 chars.
130+
*/
131+
function buildTags(options?: EnqueueOptions): string[] {
132+
const tags: string[] = []
133+
const meta = options?.metadata
134+
135+
if (meta?.workspaceId) tags.push(`workspaceId:${meta.workspaceId}`)
136+
if (meta?.workflowId) tags.push(`workflowId:${meta.workflowId}`)
137+
if (meta?.userId) tags.push(`userId:${meta.userId}`)
138+
139+
if (meta?.correlation) {
140+
const c = meta.correlation
141+
tags.push(`source:${c.source}`)
142+
if (c.webhookId) tags.push(`webhookId:${c.webhookId}`)
143+
if (c.scheduleId) tags.push(`scheduleId:${c.scheduleId}`)
144+
if (c.provider) tags.push(`provider:${c.provider}`)
145+
}
146+
147+
if (options?.tags) {
148+
for (const tag of options.tags) {
149+
if (!tags.includes(tag)) tags.push(tag)
150+
}
151+
}
152+
153+
return tags.slice(0, 10)
154+
}

apps/sim/lib/core/async-jobs/types.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ export interface Job<TPayload = unknown, TOutput = unknown> {
5454

5555
export interface JobMetadata {
5656
workflowId?: string
57+
workspaceId?: string
5758
userId?: string
5859
correlation?: AsyncExecutionCorrelation
5960
[key: string]: unknown
@@ -66,6 +67,7 @@ export interface EnqueueOptions {
6667
priority?: number
6768
name?: string
6869
delayMs?: number
70+
tags?: string[]
6971
}
7072

7173
/**

apps/sim/lib/knowledge/connectors/sync-engine.ts

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,13 +152,30 @@ export async function dispatchSync(
152152
const requestId = options?.requestId ?? crypto.randomUUID()
153153

154154
if (isTriggerAvailable()) {
155+
const connectorRows = await db
156+
.select({
157+
knowledgeBaseId: knowledgeConnector.knowledgeBaseId,
158+
workspaceId: knowledgeBase.workspaceId,
159+
userId: knowledgeBase.userId,
160+
})
161+
.from(knowledgeConnector)
162+
.innerJoin(knowledgeBase, eq(knowledgeBase.id, knowledgeConnector.knowledgeBaseId))
163+
.where(eq(knowledgeConnector.id, connectorId))
164+
.limit(1)
165+
166+
const row = connectorRows[0]
167+
const tags = [`connectorId:${connectorId}`]
168+
if (row?.knowledgeBaseId) tags.push(`knowledgeBaseId:${row.knowledgeBaseId}`)
169+
if (row?.workspaceId) tags.push(`workspaceId:${row.workspaceId}`)
170+
if (row?.userId) tags.push(`userId:${row.userId}`)
171+
155172
await knowledgeConnectorSync.trigger(
156173
{
157174
connectorId,
158175
fullSync: options?.fullSync,
159176
requestId,
160177
},
161-
{ tags: [`connector:${connectorId}`] }
178+
{ tags }
162179
)
163180
logger.info(`Dispatched connector sync to Trigger.dev`, { connectorId, requestId })
164181
} else if (isBullMQEnabled()) {

apps/sim/lib/knowledge/documents/service.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,9 @@ export interface DocumentJobData {
120120

121121
export async function dispatchDocumentProcessingJob(payload: DocumentJobData): Promise<void> {
122122
if (isTriggerAvailable()) {
123-
await tasks.trigger('knowledge-process-document', payload)
123+
await tasks.trigger('knowledge-process-document', payload, {
124+
tags: [`knowledgeBaseId:${payload.knowledgeBaseId}`, `documentId:${payload.documentId}`],
125+
})
124126
return
125127
}
126128

@@ -692,7 +694,7 @@ export async function processDocumentsWithTrigger(
692694
payload: doc,
693695
options: {
694696
idempotencyKey: `doc-process-${doc.documentId}-${requestId}`,
695-
tags: [`kb:${doc.knowledgeBaseId}`, `doc:${doc.documentId}`],
697+
tags: [`knowledgeBaseId:${doc.knowledgeBaseId}`, `documentId:${doc.documentId}`],
696698
},
697699
}))
698700
)

apps/sim/lib/logs/events.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,13 @@ export async function emitWorkflowExecutionCompleted(log: WorkflowExecutionLog):
139139
}
140140

141141
if (isTriggerDevEnabled) {
142-
await workspaceNotificationDeliveryTask.trigger(payload)
142+
await workspaceNotificationDeliveryTask.trigger(payload, {
143+
tags: [
144+
`workspaceId:${workspaceId}`,
145+
`workflowId:${log.workflowId}`,
146+
`notificationType:${subscription.notificationType}`,
147+
],
148+
})
143149
logger.info(
144150
`Enqueued ${subscription.notificationType} notification ${deliveryId} via Trigger.dev`
145151
)

0 commit comments

Comments
 (0)