Skip to content

Commit 68d207d

Browse files
improvement(webhooks): move non-polling executions off trigger.dev (#3527)
* improvement(webhooks): move non-polling off trigger.dev * restore constants file * improve comment * add unit test to prevent drift
1 parent d5502d6 commit 68d207d

File tree

12 files changed

+110
-16
lines changed

12 files changed

+110
-16
lines changed

apps/sim/app/api/webhooks/route.ts

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -367,9 +367,7 @@ export async function POST(request: NextRequest) {
367367
)
368368
}
369369

370-
// Configure each new webhook (for providers that need configuration)
371-
const pollingProviders = ['gmail', 'outlook']
372-
const needsConfiguration = pollingProviders.includes(provider)
370+
const needsConfiguration = provider === 'gmail' || provider === 'outlook'
373371

374372
if (needsConfiguration) {
375373
const configureFunc =

apps/sim/lib/core/async-jobs/config.ts

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ const logger = createLogger('AsyncJobsConfig')
77

88
let cachedBackend: JobQueueBackend | null = null
99
let cachedBackendType: AsyncBackendType | null = null
10+
let cachedInlineBackend: JobQueueBackend | null = null
1011

1112
/**
1213
* Determines which async backend to use based on environment configuration.
@@ -71,6 +72,31 @@ export function getCurrentBackendType(): AsyncBackendType | null {
7172
return cachedBackendType
7273
}
7374

75+
/**
76+
* Gets a job queue backend that bypasses Trigger.dev (Redis -> Database).
77+
* Used for non-polling webhooks that should always execute inline.
78+
*/
79+
export async function getInlineJobQueue(): Promise<JobQueueBackend> {
80+
if (cachedInlineBackend) {
81+
return cachedInlineBackend
82+
}
83+
84+
const redis = getRedisClient()
85+
let type: string
86+
if (redis) {
87+
const { RedisJobQueue } = await import('@/lib/core/async-jobs/backends/redis')
88+
cachedInlineBackend = new RedisJobQueue(redis)
89+
type = 'redis'
90+
} else {
91+
const { DatabaseJobQueue } = await import('@/lib/core/async-jobs/backends/database')
92+
cachedInlineBackend = new DatabaseJobQueue()
93+
type = 'database'
94+
}
95+
96+
logger.info(`Inline job backend initialized: ${type}`)
97+
return cachedInlineBackend
98+
}
99+
74100
/**
75101
* Checks if jobs should be executed inline (fire-and-forget).
76102
* For Redis/DB backends, we execute inline. Trigger.dev handles execution itself.
@@ -85,4 +111,5 @@ export function shouldExecuteInline(): boolean {
85111
export function resetJobQueueCache(): void {
86112
cachedBackend = null
87113
cachedBackendType = null
114+
cachedInlineBackend = null
88115
}

apps/sim/lib/core/async-jobs/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
export {
22
getAsyncBackendType,
33
getCurrentBackendType,
4+
getInlineJobQueue,
45
getJobQueue,
56
resetJobQueueCache,
67
shouldExecuteInline,

apps/sim/lib/webhooks/processor.ts

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import { and, eq, isNull, or } from 'drizzle-orm'
55
import { type NextRequest, NextResponse } from 'next/server'
66
import { v4 as uuidv4 } from 'uuid'
77
import { checkEnterprisePlan, checkTeamPlan } from '@/lib/billing/subscriptions/utils'
8-
import { getJobQueue, shouldExecuteInline } from '@/lib/core/async-jobs'
8+
import { getInlineJobQueue, getJobQueue, shouldExecuteInline } from '@/lib/core/async-jobs'
99
import { isProd } from '@/lib/core/config/feature-flags'
1010
import { safeCompare } from '@/lib/core/security/encryption'
1111
import { getEffectiveDecryptedEnv } from '@/lib/environment/utils'
@@ -29,6 +29,7 @@ import {
2929
import { executeWebhookJob } from '@/background/webhook-execution'
3030
import { resolveEnvVarReferences } from '@/executor/utils/reference-validation'
3131
import { isConfluencePayloadMatch } from '@/triggers/confluence/utils'
32+
import { isPollingWebhookProvider } from '@/triggers/constants'
3233
import { isGitHubEventMatch } from '@/triggers/github/utils'
3334
import { isHubSpotContactEventMatch } from '@/triggers/hubspot/utils'
3435
import { isJiraEventMatch } from '@/triggers/jira/utils'
@@ -1116,15 +1117,24 @@ export async function queueWebhookExecution(
11161117
...(credentialId ? { credentialId } : {}),
11171118
}
11181119

1119-
const jobQueue = await getJobQueue()
1120-
const jobId = await jobQueue.enqueue('webhook-execution', payload, {
1121-
metadata: { workflowId: foundWorkflow.id, userId: actorUserId },
1122-
})
1123-
logger.info(
1124-
`[${options.requestId}] Queued webhook execution task ${jobId} for ${foundWebhook.provider} webhook`
1125-
)
1120+
const isPolling = isPollingWebhookProvider(payload.provider)
11261121

1127-
if (shouldExecuteInline()) {
1122+
if (isPolling && !shouldExecuteInline()) {
1123+
const jobQueue = await getJobQueue()
1124+
const jobId = await jobQueue.enqueue('webhook-execution', payload, {
1125+
metadata: { workflowId: foundWorkflow.id, userId: actorUserId },
1126+
})
1127+
logger.info(
1128+
`[${options.requestId}] Queued polling webhook execution task ${jobId} for ${foundWebhook.provider} webhook via job queue`
1129+
)
1130+
} else {
1131+
const jobQueue = await getInlineJobQueue()
1132+
const jobId = await jobQueue.enqueue('webhook-execution', payload, {
1133+
metadata: { workflowId: foundWorkflow.id, userId: actorUserId },
1134+
})
1135+
logger.info(
1136+
`[${options.requestId}] Executing ${foundWebhook.provider} webhook ${jobId} inline`
1137+
)
11281138
void (async () => {
11291139
try {
11301140
await jobQueue.startJob(jobId)

apps/sim/lib/webhooks/utils.server.ts

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import {
1919
refreshAccessTokenIfNeeded,
2020
resolveOAuthAccountId,
2121
} from '@/app/api/auth/oauth/utils'
22+
import { isPollingWebhookProvider } from '@/triggers/constants'
2223

2324
const logger = createLogger('WebhookUtils')
2425

@@ -2222,10 +2223,7 @@ export async function syncWebhooksForCredentialSet(params: {
22222223
`[${requestId}] Syncing webhooks for credential set ${credentialSetId}, provider ${provider}`
22232224
)
22242225

2225-
// Polling providers get unique paths per credential (for independent state)
2226-
// External webhook providers share the same path (external service sends to one URL)
2227-
const pollingProviders = ['gmail', 'outlook', 'rss', 'imap']
2228-
const useUniquePaths = pollingProviders.includes(provider)
2226+
const useUniquePaths = isPollingWebhookProvider(provider)
22292227

22302228
const credentials = await getCredentialsForCredentialSet(credentialSetId, oauthProviderId)
22312229

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/**
2+
* @vitest-environment node
3+
*/
4+
import { describe, expect, it } from 'vitest'
5+
import { POLLING_PROVIDERS } from '@/triggers/constants'
6+
import { TRIGGER_REGISTRY } from '@/triggers/registry'
7+
8+
describe('POLLING_PROVIDERS sync with TriggerConfig.polling', () => {
9+
it('matches every trigger with polling: true in the registry', () => {
10+
const registryPollingProviders = new Set(
11+
Object.values(TRIGGER_REGISTRY)
12+
.filter((t) => t.polling === true)
13+
.map((t) => t.provider)
14+
)
15+
16+
expect(POLLING_PROVIDERS).toEqual(registryPollingProviders)
17+
})
18+
19+
it('no trigger with polling: true is missing from POLLING_PROVIDERS', () => {
20+
const missing: string[] = []
21+
for (const trigger of Object.values(TRIGGER_REGISTRY)) {
22+
if (trigger.polling && !POLLING_PROVIDERS.has(trigger.provider)) {
23+
missing.push(`${trigger.id} (provider: ${trigger.provider})`)
24+
}
25+
}
26+
expect(missing, `Triggers with polling: true missing from POLLING_PROVIDERS`).toEqual([])
27+
})
28+
29+
it('no POLLING_PROVIDERS entry lacks a polling: true trigger in the registry', () => {
30+
const extra: string[] = []
31+
for (const provider of POLLING_PROVIDERS) {
32+
const hasTrigger = Object.values(TRIGGER_REGISTRY).some(
33+
(t) => t.provider === provider && t.polling === true
34+
)
35+
if (!hasTrigger) {
36+
extra.push(provider)
37+
}
38+
}
39+
expect(extra, `POLLING_PROVIDERS entries with no matching polling trigger`).toEqual([])
40+
})
41+
})

apps/sim/triggers/constants.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,3 +35,15 @@ export const TRIGGER_RUNTIME_SUBBLOCK_IDS: string[] = [
3535
* This prevents runaway errors from continuously executing failing workflows.
3636
*/
3737
export const MAX_CONSECUTIVE_FAILURES = 100
38+
39+
/**
40+
* Set of webhook provider names that use polling-based triggers.
41+
* Mirrors the `polling: true` flag on TriggerConfig entries.
42+
* Used to route execution: polling providers use the full job queue
43+
* (Trigger.dev), non-polling providers execute inline.
44+
*/
45+
export const POLLING_PROVIDERS = new Set(['gmail', 'outlook', 'rss', 'imap'])
46+
47+
export function isPollingWebhookProvider(provider: string): boolean {
48+
return POLLING_PROVIDERS.has(provider)
49+
}

apps/sim/triggers/gmail/poller.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ export const gmailPollingTrigger: TriggerConfig = {
3030
description: 'Triggers when new emails are received in Gmail (requires Gmail credentials)',
3131
version: '1.0.0',
3232
icon: GmailIcon,
33+
polling: true,
3334

3435
subBlocks: [
3536
{

apps/sim/triggers/imap/poller.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ export const imapPollingTrigger: TriggerConfig = {
1212
description: 'Triggers when new emails are received via IMAP (works with any email provider)',
1313
version: '1.0.0',
1414
icon: MailServerIcon,
15+
polling: true,
1516

1617
subBlocks: [
1718
// Connection settings

apps/sim/triggers/outlook/poller.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ export const outlookPollingTrigger: TriggerConfig = {
2424
description: 'Triggers when new emails are received in Outlook (requires Microsoft credentials)',
2525
version: '1.0.0',
2626
icon: OutlookIcon,
27+
polling: true,
2728

2829
subBlocks: [
2930
{

0 commit comments

Comments
 (0)