@@ -5,7 +5,7 @@ import { and, eq, isNull, or } from 'drizzle-orm'
55import { type NextRequest , NextResponse } from 'next/server'
66import { v4 as uuidv4 } from 'uuid'
77import { checkEnterprisePlan , checkTeamPlan } from '@/lib/billing/subscriptions/utils'
8- import { getJobQueue , shouldExecuteInline } from '@/lib/core/async-jobs'
8+ import { getInlineJobQueue , getJobQueue , shouldExecuteInline } from '@/lib/core/async-jobs'
99import { isProd } from '@/lib/core/config/feature-flags'
1010import { safeCompare } from '@/lib/core/security/encryption'
1111import { getEffectiveDecryptedEnv } from '@/lib/environment/utils'
@@ -29,6 +29,7 @@ import {
2929import { executeWebhookJob } from '@/background/webhook-execution'
3030import { resolveEnvVarReferences } from '@/executor/utils/reference-validation'
3131import { isConfluencePayloadMatch } from '@/triggers/confluence/utils'
32+ import { isPollingWebhookProvider } from '@/triggers/constants'
3233import { isGitHubEventMatch } from '@/triggers/github/utils'
3334import { isHubSpotContactEventMatch } from '@/triggers/hubspot/utils'
3435import { isJiraEventMatch } from '@/triggers/jira/utils'
@@ -1049,7 +1050,7 @@ export async function queueWebhookExecution(
10491050 }
10501051 }
10511052
1052- const headers = Object . fromEntries ( request . headers . entries ( ) )
1053+ const { 'x-sim-idempotency-key' : _ , ... headers } = Object . fromEntries ( request . headers . entries ( ) )
10531054
10541055 // For Microsoft Teams Graph notifications, extract unique identifiers for idempotency
10551056 if (
@@ -1067,9 +1068,20 @@ export async function queueWebhookExecution(
10671068 }
10681069 }
10691070
1070- // Extract credentialId from webhook config
1071- // Note: Each webhook now has its own credentialId (credential sets are fanned out at save time)
10721071 const providerConfig = ( foundWebhook . providerConfig as Record < string , any > ) || { }
1072+
1073+ if ( foundWebhook . provider === 'generic' ) {
1074+ const idempotencyField = providerConfig . idempotencyField as string | undefined
1075+ if ( idempotencyField && body ) {
1076+ const value = idempotencyField
1077+ . split ( '.' )
1078+ . reduce ( ( acc : any , key : string ) => acc ?. [ key ] , body )
1079+ if ( value !== undefined && value !== null && typeof value !== 'object' ) {
1080+ headers [ 'x-sim-idempotency-key' ] = String ( value )
1081+ }
1082+ }
1083+ }
1084+
10731085 const credentialId = providerConfig . credentialId as string | undefined
10741086
10751087 // credentialSetId is a direct field on webhook table, not in providerConfig
@@ -1105,15 +1117,24 @@ export async function queueWebhookExecution(
11051117 ...( credentialId ? { credentialId } : { } ) ,
11061118 }
11071119
1108- const jobQueue = await getJobQueue ( )
1109- const jobId = await jobQueue . enqueue ( 'webhook-execution' , payload , {
1110- metadata : { workflowId : foundWorkflow . id , userId : actorUserId } ,
1111- } )
1112- logger . info (
1113- `[${ options . requestId } ] Queued webhook execution task ${ jobId } for ${ foundWebhook . provider } webhook`
1114- )
1120+ const isPolling = isPollingWebhookProvider ( payload . provider )
11151121
1116- if ( shouldExecuteInline ( ) ) {
1122+ if ( isPolling && ! shouldExecuteInline ( ) ) {
1123+ const jobQueue = await getJobQueue ( )
1124+ const jobId = await jobQueue . enqueue ( 'webhook-execution' , payload , {
1125+ metadata : { workflowId : foundWorkflow . id , userId : actorUserId } ,
1126+ } )
1127+ logger . info (
1128+ `[${ options . requestId } ] Queued polling webhook execution task ${ jobId } for ${ foundWebhook . provider } webhook via job queue`
1129+ )
1130+ } else {
1131+ const jobQueue = await getInlineJobQueue ( )
1132+ const jobId = await jobQueue . enqueue ( 'webhook-execution' , payload , {
1133+ metadata : { workflowId : foundWorkflow . id , userId : actorUserId } ,
1134+ } )
1135+ logger . info (
1136+ `[${ options . requestId } ] Executing ${ foundWebhook . provider } webhook ${ jobId } inline`
1137+ )
11171138 void ( async ( ) => {
11181139 try {
11191140 await jobQueue . startJob ( jobId )
@@ -1193,6 +1214,26 @@ export async function queueWebhookExecution(
11931214 } )
11941215 }
11951216
1217+ if ( foundWebhook . provider === 'generic' && providerConfig . responseMode === 'custom' ) {
1218+ const rawCode = Number ( providerConfig . responseStatusCode ) || 200
1219+ const statusCode = rawCode >= 100 && rawCode <= 599 ? rawCode : 200
1220+ const responseBody = ( providerConfig . responseBody as string | undefined ) ?. trim ( )
1221+
1222+ if ( ! responseBody ) {
1223+ return new NextResponse ( null , { status : statusCode } )
1224+ }
1225+
1226+ try {
1227+ const parsed = JSON . parse ( responseBody )
1228+ return NextResponse . json ( parsed , { status : statusCode } )
1229+ } catch {
1230+ return new NextResponse ( responseBody , {
1231+ status : statusCode ,
1232+ headers : { 'Content-Type' : 'text/plain' } ,
1233+ } )
1234+ }
1235+ }
1236+
11961237 return NextResponse . json ( { message : 'Webhook processed' } )
11971238 } catch ( error : any ) {
11981239 logger . error ( `[${ options . requestId } ] Failed to queue webhook execution:` , error )
0 commit comments