Skip to content

Commit 147be17

Browse files
fix(async): harden execution finalization guards
Prevent leaked core finalization markers from accumulating while keeping outer recovery paths idempotent. Preserve best-effort logging completion by reusing settled completion promises instead of reopening duplicate terminal writes.
1 parent ad2c4aa commit 147be17

File tree

4 files changed

+132
-41
lines changed

4 files changed

+132
-41
lines changed

apps/sim/lib/logs/execution/logging-session.test.ts

Lines changed: 31 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,8 @@
1-
import { describe, expect, it, vi } from 'vitest'
1+
import { beforeEach, describe, expect, it, vi } from 'vitest'
2+
3+
const { completeWorkflowExecutionMock } = vi.hoisted(() => ({
4+
completeWorkflowExecutionMock: vi.fn(),
5+
}))
26

37
vi.mock('@sim/db', () => ({
48
db: {},
@@ -25,7 +29,7 @@ vi.mock('drizzle-orm', () => ({
2529
vi.mock('@/lib/logs/execution/logger', () => ({
2630
executionLogger: {
2731
startWorkflowExecution: vi.fn(),
28-
completeWorkflowExecution: vi.fn(),
32+
completeWorkflowExecution: completeWorkflowExecutionMock,
2933
},
3034
}))
3135

@@ -50,26 +54,39 @@ vi.mock('@/lib/logs/execution/logging-factory', () => ({
5054
import { LoggingSession } from './logging-session'
5155

5256
describe('LoggingSession completion retries', () => {
53-
it('clears failed completion promise so error finalization can retry', async () => {
54-
const session = new LoggingSession('workflow-1', 'execution-1', 'api', 'req-1') as any
57+
beforeEach(() => {
58+
vi.clearAllMocks()
59+
})
5560

56-
const successFinalizeError = new Error('success finalize failed')
57-
session.complete = vi.fn().mockRejectedValue(successFinalizeError)
58-
session.completeWithCostOnlyLog = vi.fn().mockRejectedValue(successFinalizeError)
59-
session.completeWithError = vi.fn().mockResolvedValue(undefined)
61+
it('keeps completion best-effort when full completion and fallback both fail', async () => {
62+
const session = new LoggingSession('workflow-1', 'execution-1', 'api', 'req-1')
6063

61-
await expect(session.safeComplete({ finalOutput: { ok: true } })).rejects.toThrow(
62-
'success finalize failed'
63-
)
64+
completeWorkflowExecutionMock
65+
.mockRejectedValueOnce(new Error('success finalize failed'))
66+
.mockRejectedValueOnce(new Error('cost only failed'))
67+
68+
await expect(session.safeComplete({ finalOutput: { ok: true } })).resolves.toBeUndefined()
6469

6570
await expect(
6671
session.safeCompleteWithError({
6772
error: { message: 'fallback error finalize' },
6873
})
6974
).resolves.toBeUndefined()
7075

71-
expect(session.complete).toHaveBeenCalledTimes(1)
72-
expect(session.completeWithCostOnlyLog).toHaveBeenCalledTimes(1)
73-
expect(session.completeWithError).toHaveBeenCalledTimes(1)
76+
expect(completeWorkflowExecutionMock).toHaveBeenCalledTimes(2)
77+
})
78+
79+
it('reuses the settled completion promise for repeated completion attempts', async () => {
80+
const session = new LoggingSession('workflow-1', 'execution-1', 'api', 'req-1')
81+
82+
completeWorkflowExecutionMock
83+
.mockRejectedValueOnce(new Error('success finalize failed'))
84+
.mockRejectedValueOnce(new Error('cost only failed'))
85+
86+
await expect(session.safeComplete({ finalOutput: { ok: true } })).resolves.toBeUndefined()
87+
88+
await expect(session.safeComplete({ finalOutput: { ok: true } })).resolves.toBeUndefined()
89+
90+
expect(completeWorkflowExecutionMock).toHaveBeenCalledTimes(2)
7491
})
7592
})

apps/sim/lib/logs/execution/logging-session.ts

Lines changed: 30 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ type TriggerData = Record<string, unknown> & {
2525

2626
const logger = createLogger('LoggingSession')
2727

28+
type CompletionAttempt = 'complete' | 'error' | 'cancelled' | 'paused'
29+
2830
export interface SessionStartParams {
2931
userId?: string
3032
workspaceId: string
@@ -98,6 +100,8 @@ export class LoggingSession {
98100
private completing = false
99101
/** Tracks the in-flight completion promise so callers can await it */
100102
private completionPromise: Promise<void> | null = null
103+
private completionAttempt: CompletionAttempt | null = null
104+
private completionAttemptFailed = false
101105
private accumulatedCost: AccumulatedCost = {
102106
total: BASE_EXECUTION_CHARGE,
103107
input: 0,
@@ -695,15 +699,32 @@ export class LoggingSession {
695699
}
696700
}
697701

698-
async safeComplete(params: SessionCompleteParams = {}): Promise<void> {
699-
if (this.completionPromise) return this.completionPromise
700-
this.completionPromise = this._safeCompleteImpl(params).catch((error) => {
701-
this.completionPromise = null
702+
private shouldStartNewCompletionAttempt(attempt: CompletionAttempt): boolean {
703+
return this.completionAttemptFailed && this.completionAttempt !== 'error' && attempt === 'error'
704+
}
705+
706+
private runCompletionAttempt(
707+
attempt: CompletionAttempt,
708+
run: () => Promise<void>
709+
): Promise<void> {
710+
if (this.completionPromise && !this.shouldStartNewCompletionAttempt(attempt)) {
711+
return this.completionPromise
712+
}
713+
714+
this.completionAttempt = attempt
715+
this.completionAttemptFailed = false
716+
this.completionPromise = run().catch((error) => {
717+
this.completionAttemptFailed = true
702718
throw error
703719
})
720+
704721
return this.completionPromise
705722
}
706723

724+
async safeComplete(params: SessionCompleteParams = {}): Promise<void> {
725+
return this.runCompletionAttempt('complete', () => this._safeCompleteImpl(params))
726+
}
727+
707728
private async _safeCompleteImpl(params: SessionCompleteParams = {}): Promise<void> {
708729
try {
709730
await this.complete(params)
@@ -724,12 +745,7 @@ export class LoggingSession {
724745
}
725746

726747
async safeCompleteWithError(params?: SessionErrorCompleteParams): Promise<void> {
727-
if (this.completionPromise) return this.completionPromise
728-
this.completionPromise = this._safeCompleteWithErrorImpl(params).catch((error) => {
729-
this.completionPromise = null
730-
throw error
731-
})
732-
return this.completionPromise
748+
return this.runCompletionAttempt('error', () => this._safeCompleteWithErrorImpl(params))
733749
}
734750

735751
private async _safeCompleteWithErrorImpl(params?: SessionErrorCompleteParams): Promise<void> {
@@ -754,12 +770,9 @@ export class LoggingSession {
754770
}
755771

756772
async safeCompleteWithCancellation(params?: SessionCancelledParams): Promise<void> {
757-
if (this.completionPromise) return this.completionPromise
758-
this.completionPromise = this._safeCompleteWithCancellationImpl(params).catch((error) => {
759-
this.completionPromise = null
760-
throw error
761-
})
762-
return this.completionPromise
773+
return this.runCompletionAttempt('cancelled', () =>
774+
this._safeCompleteWithCancellationImpl(params)
775+
)
763776
}
764777

765778
private async _safeCompleteWithCancellationImpl(params?: SessionCancelledParams): Promise<void> {
@@ -783,12 +796,7 @@ export class LoggingSession {
783796
}
784797

785798
async safeCompleteWithPause(params?: SessionPausedParams): Promise<void> {
786-
if (this.completionPromise) return this.completionPromise
787-
this.completionPromise = this._safeCompleteWithPauseImpl(params).catch((error) => {
788-
this.completionPromise = null
789-
throw error
790-
})
791-
return this.completionPromise
799+
return this.runCompletionAttempt('paused', () => this._safeCompleteWithPauseImpl(params))
792800
}
793801

794802
private async _safeCompleteWithPauseImpl(params?: SessionPausedParams): Promise<void> {

apps/sim/lib/workflows/executor/execution-core.test.ts

Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,11 @@ vi.mock('@/serializer', () => ({
8989
})),
9090
}))
9191

92-
import { executeWorkflowCore, wasExecutionFinalizedByCore } from './execution-core'
92+
import {
93+
executeWorkflowCore,
94+
FINALIZED_EXECUTION_ID_TTL_MS,
95+
wasExecutionFinalizedByCore,
96+
} from './execution-core'
9397

9498
describe('executeWorkflowCore terminal finalization sequencing', () => {
9599
const loggingSession = {
@@ -127,6 +131,7 @@ describe('executeWorkflowCore terminal finalization sequencing', () => {
127131

128132
beforeEach(() => {
129133
vi.clearAllMocks()
134+
vi.useRealTimers()
130135

131136
loadWorkflowFromNormalizedTablesMock.mockResolvedValue({
132137
blocks: {
@@ -330,7 +335,47 @@ describe('executeWorkflowCore terminal finalization sequencing', () => {
330335

331336
expect(safeCompleteWithErrorMock).toHaveBeenCalledTimes(1)
332337
expect(wasExecutionFinalizedByCore('engine failed', 'execution-1')).toBe(true)
333-
expect(wasExecutionFinalizedByCore('engine failed', 'execution-1')).toBe(false)
338+
expect(wasExecutionFinalizedByCore('engine failed', 'execution-1')).toBe(true)
339+
})
340+
341+
it('expires stale finalized execution ids for callers that never consume the guard', async () => {
342+
vi.useFakeTimers()
343+
vi.setSystemTime(new Date('2026-03-13T00:00:00.000Z'))
344+
345+
executorExecuteMock.mockRejectedValue('engine failed')
346+
347+
await expect(
348+
executeWorkflowCore({
349+
snapshot: {
350+
...createSnapshot(),
351+
metadata: {
352+
...createSnapshot().metadata,
353+
executionId: 'execution-stale',
354+
},
355+
} as any,
356+
callbacks: {},
357+
loggingSession: loggingSession as any,
358+
})
359+
).rejects.toBe('engine failed')
360+
361+
vi.setSystemTime(new Date(Date.now() + FINALIZED_EXECUTION_ID_TTL_MS + 1))
362+
363+
await expect(
364+
executeWorkflowCore({
365+
snapshot: {
366+
...createSnapshot(),
367+
metadata: {
368+
...createSnapshot().metadata,
369+
executionId: 'execution-fresh',
370+
},
371+
} as any,
372+
callbacks: {},
373+
loggingSession: loggingSession as any,
374+
})
375+
).rejects.toBe('engine failed')
376+
377+
expect(wasExecutionFinalizedByCore('engine failed', 'execution-stale')).toBe(false)
378+
expect(wasExecutionFinalizedByCore('engine failed', 'execution-fresh')).toBe(true)
334379
})
335380

336381
it('falls back to error finalization when success finalization rejects', async () => {

apps/sim/lib/workflows/executor/execution-core.ts

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -116,18 +116,39 @@ type ExecutionErrorWithFinalizationFlag = Error & {
116116
executionFinalizedByCore?: boolean
117117
}
118118

119-
const finalizedExecutionIds = new Set<string>()
119+
export const FINALIZED_EXECUTION_ID_TTL_MS = 5 * 60 * 1000
120+
121+
const finalizedExecutionIds = new Map<string, number>()
122+
123+
function cleanupExpiredFinalizedExecutionIds(now = Date.now()): void {
124+
for (const [executionId, expiresAt] of finalizedExecutionIds.entries()) {
125+
if (expiresAt > now) {
126+
break
127+
}
128+
129+
finalizedExecutionIds.delete(executionId)
130+
}
131+
}
132+
133+
function rememberFinalizedExecutionId(executionId: string): void {
134+
const now = Date.now()
135+
136+
cleanupExpiredFinalizedExecutionIds(now)
137+
finalizedExecutionIds.set(executionId, now + FINALIZED_EXECUTION_ID_TTL_MS)
138+
}
120139

121140
function markExecutionFinalizedByCore(error: unknown, executionId: string): void {
122-
finalizedExecutionIds.add(executionId)
141+
rememberFinalizedExecutionId(executionId)
123142

124143
if (error instanceof Error) {
125144
;(error as ExecutionErrorWithFinalizationFlag).executionFinalizedByCore = true
126145
}
127146
}
128147

129148
export function wasExecutionFinalizedByCore(error: unknown, executionId?: string): boolean {
130-
if (executionId && finalizedExecutionIds.delete(executionId)) {
149+
cleanupExpiredFinalizedExecutionIds()
150+
151+
if (executionId && finalizedExecutionIds.has(executionId)) {
131152
return true
132153
}
133154

0 commit comments

Comments
 (0)