feat(concurrency): bullmq based concurrency control system#3605
feat(concurrency): bullmq based concurrency control system#3605icecrasher321 wants to merge 11 commits intostagingfrom
Conversation
|
The latest updates on your projects. Learn more about Vercel for GitHub. |
PR SummaryHigh Risk Overview Execution entrypoints are updated to enqueue via Adds an in-process Written by Cursor Bugbot for commit 2bf1feb. Configure here. |
Greptile SummaryThis PR introduces a comprehensive BullMQ-based queuing and concurrency control system for workflow, webhook, and schedule executions. It replaces the previous Redis/database job queue backends with BullMQ queues backed by Redis, adds a per-workspace fairness dispatcher (with a Lua-script-based atomic claim mechanism), a lease-based concurrency limit (keyed to billing plan), an in-process admission gate for external API requests, and a standalone worker process to consume jobs. It also adds a buffered SSE stream for non-manual executions that now run asynchronously through the dispatch queue. The change is substantial (~5900 lines added) and represents a foundational infrastructure improvement for reliability and rate-limiting protection. Key changes and issues found:
Confidence Score: 3/5
Important Files Changed
Sequence DiagramsequenceDiagram
participant Client
participant RouteHandler as API Route Handler
participant AdmissionGate as Admission Gate (in-process)
participant Dispatcher as Workspace Dispatcher
participant RedisStore as Redis Dispatch Store
participant BullMQ as BullMQ Queue (Redis)
participant Worker as BullMQ Worker Process
participant DispatchWorker as Dispatch Worker (worker.ts)
Client->>RouteHandler: POST /api/workflows/[id]/execute
RouteHandler->>AdmissionGate: tryAdmit()
alt At capacity (inflight >= MAX_INFLIGHT)
AdmissionGate-->>RouteHandler: null
RouteHandler-->>Client: 429 Too Many Requests
else Admitted
AdmissionGate-->>RouteHandler: ticket
RouteHandler->>Dispatcher: enqueueWorkspaceDispatch(input)
Dispatcher->>RedisStore: enqueueWorkspaceDispatchJob()
RedisStore-->>Dispatcher: jobRecord (status=waiting)
Dispatcher->>Dispatcher: runDispatcherLoop() [void]
Dispatcher->>RedisStore: popNextWorkspaceId()
Dispatcher->>RedisStore: claimWorkspaceJob() [Lua script]
RedisStore-->>Dispatcher: {type: admitted, record, leaseId}
Dispatcher->>BullMQ: queue.add(jobName, payload, {jobId})
Dispatcher->>RedisStore: markDispatchJobAdmitted()
Dispatcher-->>RouteHandler: dispatchJobId
RouteHandler->>RouteHandler: waitForDispatchJob(id, timeout) [polls 250ms]
Worker->>BullMQ: picks up job
Worker->>DispatchWorker: runDispatchedJob(metadata, fn)
DispatchWorker->>RedisStore: markDispatchJobRunning()
DispatchWorker->>DispatchWorker: executeQueuedWorkflowJob() / executeWorkflowJob()
DispatchWorker->>RedisStore: markDispatchJobCompleted(output)
DispatchWorker->>RedisStore: releaseWorkspaceLease()
DispatchWorker->>Dispatcher: wakeWorkspaceDispatcher()
RedisStore-->>RouteHandler: record (status=completed) [via poll]
RouteHandler->>AdmissionGate: ticket.release()
RouteHandler-->>Client: 200 JSON result
end
Last reviewed commit: be83c97 |
|
bugbot run |
|
bugbot run |
|
bugbot run |
|
bugbot run |
|
bugbot run |
|
bugbot run |
|
bugbot run |
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.

Summary
BullMQ based concurrency control system for executions currently running in line [manual execs excluded]. Can tune limits based on resources.
Overall admin gates to prevent rate limiting services based crashes.
Type of Change
Testing
Tested manually under different configurations, added extensive test suite
Checklist