Skip to content

Commit 9a6886c

Browse files
committed
optimize reconciler scan
1 parent 7bf9526 commit 9a6886c

File tree

5 files changed

+40
-16
lines changed

5 files changed

+40
-16
lines changed

apps/sim/lib/core/workspace-dispatch/adapter.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ export interface WorkspaceDispatchStorageAdapter {
3434
lanes: readonly WorkspaceDispatchLane[]
3535
): Promise<number>
3636
getGlobalQueueDepth(): Promise<number>
37-
reconcileGlobalQueueDepth(): Promise<void>
37+
reconcileGlobalQueueDepth(knownCount: number): Promise<void>
3838
popNextWorkspaceId(): Promise<string | null>
3939
getQueuedWorkspaceCount(): Promise<number>
4040
hasActiveWorkspace(workspaceId: string): Promise<boolean>

apps/sim/lib/core/workspace-dispatch/memory-store.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -290,7 +290,7 @@ export class MemoryWorkspaceDispatchStorage implements WorkspaceDispatchStorageA
290290
return count
291291
}
292292

293-
async reconcileGlobalQueueDepth(): Promise<void> {
293+
async reconcileGlobalQueueDepth(_knownCount: number): Promise<void> {
294294
// no-op: memory store computes depth on the fly
295295
}
296296

apps/sim/lib/core/workspace-dispatch/reconciler.ts

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -153,9 +153,39 @@ async function reconcileWaitingWorkspaceTracking(
153153
}
154154

155155
export async function reconcileWorkspaceDispatchState(): Promise<void> {
156-
const activeJobs = await listDispatchJobsByStatuses(['admitting', 'admitted', 'running'])
157-
const waitingJobs = await listDispatchJobsByStatuses(['waiting'])
158-
const terminalJobs = await listDispatchJobsByStatuses(['completed', 'failed'])
156+
const allJobs = await listDispatchJobsByStatuses([
157+
'waiting',
158+
'admitting',
159+
'admitted',
160+
'running',
161+
'completed',
162+
'failed',
163+
])
164+
165+
const activeJobs: WorkspaceDispatchJobRecord[] = []
166+
const waitingJobs: WorkspaceDispatchJobRecord[] = []
167+
const terminalJobs: WorkspaceDispatchJobRecord[] = []
168+
let nonTerminalCount = 0
169+
170+
for (const job of allJobs) {
171+
switch (job.status) {
172+
case 'admitting':
173+
case 'admitted':
174+
case 'running':
175+
activeJobs.push(job)
176+
nonTerminalCount++
177+
break
178+
case 'waiting':
179+
waitingJobs.push(job)
180+
nonTerminalCount++
181+
break
182+
case 'completed':
183+
case 'failed':
184+
terminalJobs.push(job)
185+
break
186+
}
187+
}
188+
159189
let changed = false
160190

161191
for (const record of activeJobs) {
@@ -181,7 +211,7 @@ export async function reconcileWorkspaceDispatchState(): Promise<void> {
181211
}
182212
}
183213

184-
await reconcileGlobalQueueDepth().catch((error) => {
214+
await reconcileGlobalQueueDepth(nonTerminalCount).catch((error) => {
185215
logger.error('Failed to reconcile global queue depth', { error })
186216
})
187217

apps/sim/lib/core/workspace-dispatch/redis-store.ts

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -363,14 +363,8 @@ export class RedisWorkspaceDispatchStorage implements WorkspaceDispatchStorageAd
363363
return count ? Math.max(0, Number.parseInt(count, 10)) : 0
364364
}
365365

366-
async reconcileGlobalQueueDepth(): Promise<void> {
367-
const allJobs = await this.listDispatchJobsByStatuses([
368-
'waiting',
369-
'admitting',
370-
'admitted',
371-
'running',
372-
])
373-
await this.redis.set(GLOBAL_DEPTH_KEY, allJobs.length)
366+
async reconcileGlobalQueueDepth(knownCount: number): Promise<void> {
367+
await this.redis.set(GLOBAL_DEPTH_KEY, knownCount)
374368
}
375369

376370
async popNextWorkspaceId(): Promise<string | null> {

apps/sim/lib/core/workspace-dispatch/store.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,8 +77,8 @@ export async function getGlobalQueueDepth(): Promise<number> {
7777
return getAdapter().getGlobalQueueDepth()
7878
}
7979

80-
export async function reconcileGlobalQueueDepth(): Promise<void> {
81-
return getAdapter().reconcileGlobalQueueDepth()
80+
export async function reconcileGlobalQueueDepth(knownCount: number): Promise<void> {
81+
return getAdapter().reconcileGlobalQueueDepth(knownCount)
8282
}
8383

8484
export async function popNextWorkspaceId(): Promise<string | null> {

0 commit comments

Comments
 (0)