Skip to content

Commit 90352fe

Browse files
committed
Fix subagent abort
1 parent 7073eab commit 90352fe

File tree

5 files changed

+107
-37
lines changed

5 files changed

+107
-37
lines changed

apps/sim/lib/copilot/orchestrator/index.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,11 @@ export async function orchestrateCopilotStream(
112112
loopOptions
113113
)
114114

115+
if (options.abortSignal?.aborted || context.wasAborted) {
116+
context.awaitingAsyncContinuation = undefined
117+
break
118+
}
119+
115120
const continuation = context.awaitingAsyncContinuation
116121
if (!continuation) break
117122

@@ -145,7 +150,7 @@ export async function orchestrateCopilotStream(
145150
}
146151

147152
const result: OrchestratorResult = {
148-
success: context.errors.length === 0,
153+
success: context.errors.length === 0 && !context.wasAborted,
149154
content: context.accumulatedContent,
150155
contentBlocks: context.contentBlocks,
151156
toolCalls: buildToolCallSummaries(context),

apps/sim/lib/copilot/orchestrator/sse/handlers/handlers.test.ts

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,4 +102,41 @@ describe('sse-handlers tool lifecycle', () => {
102102
expect(executeToolServerSide).toHaveBeenCalledTimes(1)
103103
expect(markToolComplete).toHaveBeenCalledTimes(1)
104104
})
105+
106+
it('marks an in-flight tool as cancelled when aborted mid-execution', async () => {
107+
const abortController = new AbortController()
108+
execContext.abortSignal = abortController.signal
109+
110+
executeToolServerSide.mockImplementationOnce(
111+
() =>
112+
new Promise((resolve) => {
113+
setTimeout(() => resolve({ success: true, output: { ok: true } }), 0)
114+
})
115+
)
116+
markToolComplete.mockResolvedValue(true)
117+
118+
await sseHandlers.tool_call(
119+
{
120+
type: 'tool_call',
121+
data: { id: 'tool-cancel', name: 'read', arguments: { workflowId: 'workflow-1' } },
122+
} as any,
123+
context,
124+
execContext,
125+
{ interactive: false, timeout: 1000, abortSignal: abortController.signal }
126+
)
127+
128+
abortController.abort()
129+
await new Promise((resolve) => setTimeout(resolve, 10))
130+
131+
expect(markToolComplete).toHaveBeenCalledWith(
132+
'tool-cancel',
133+
'read',
134+
499,
135+
'Request aborted during tool execution',
136+
{ cancelled: true }
137+
)
138+
139+
const updated = context.toolCalls.get('tool-cancel')
140+
expect(updated?.status).toBe('cancelled')
141+
})
105142
})

apps/sim/lib/copilot/orchestrator/sse/handlers/tool-execution.ts

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,20 @@ function cancelledCompletion(message: string): AsyncToolCompletion {
228228
}
229229
}
230230

231+
function reportCancelledTool(
232+
toolCall: { id: string; name: string },
233+
message: string,
234+
data: Record<string, unknown> = { cancelled: true }
235+
): void {
236+
markToolComplete(toolCall.id, toolCall.name, 499, message, data).catch((err) => {
237+
logger.error('markToolComplete failed (cancelled)', {
238+
toolCallId: toolCall.id,
239+
toolName: toolCall.name,
240+
error: err instanceof Error ? err.message : String(err),
241+
})
242+
})
243+
}
244+
231245
async function maybeWriteOutputToTable(
232246
toolName: string,
233247
params: Record<string, unknown> | undefined,
@@ -472,14 +486,7 @@ export async function executeToolAndReport(
472486
result: { cancelled: true },
473487
error: 'Request aborted before tool execution',
474488
}).catch(() => {})
475-
markToolComplete(toolCall.id, toolCall.name, 499, 'Request aborted before tool execution', {
476-
cancelled: true,
477-
}).catch((err) => {
478-
logger.error('markToolComplete failed (aborted before execute)', {
479-
toolCallId: toolCall.id,
480-
error: err instanceof Error ? err.message : String(err),
481-
})
482-
})
489+
reportCancelledTool(toolCall, 'Request aborted before tool execution')
483490
return cancelledCompletion('Request aborted before tool execution')
484491
}
485492

@@ -504,6 +511,7 @@ export async function executeToolAndReport(
504511
result: { cancelled: true },
505512
error: 'Request aborted during tool execution',
506513
}).catch(() => {})
514+
reportCancelledTool(toolCall, 'Request aborted during tool execution')
507515
return cancelledCompletion('Request aborted during tool execution')
508516
}
509517
result = await maybeWriteOutputToFile(toolCall.name, toolCall.params, result, execContext)
@@ -517,6 +525,7 @@ export async function executeToolAndReport(
517525
result: { cancelled: true },
518526
error: 'Request aborted during tool post-processing',
519527
}).catch(() => {})
528+
reportCancelledTool(toolCall, 'Request aborted during tool post-processing')
520529
return cancelledCompletion('Request aborted during tool post-processing')
521530
}
522531
result = await maybeWriteOutputToTable(toolCall.name, toolCall.params, result, execContext)
@@ -530,6 +539,7 @@ export async function executeToolAndReport(
530539
result: { cancelled: true },
531540
error: 'Request aborted during tool post-processing',
532541
}).catch(() => {})
542+
reportCancelledTool(toolCall, 'Request aborted during tool post-processing')
533543
return cancelledCompletion('Request aborted during tool post-processing')
534544
}
535545
result = await maybeWriteReadCsvToTable(toolCall.name, toolCall.params, result, execContext)
@@ -543,6 +553,7 @@ export async function executeToolAndReport(
543553
result: { cancelled: true },
544554
error: 'Request aborted during tool post-processing',
545555
}).catch(() => {})
556+
reportCancelledTool(toolCall, 'Request aborted during tool post-processing')
546557
return cancelledCompletion('Request aborted during tool post-processing')
547558
}
548559
toolCall.status = result.success ? 'success' : 'error'
@@ -590,6 +601,7 @@ export async function executeToolAndReport(
590601

591602
if (abortRequested(context, execContext, options)) {
592603
toolCall.status = 'cancelled'
604+
reportCancelledTool(toolCall, 'Request aborted before tool result delivery')
593605
return cancelledCompletion('Request aborted before tool result delivery')
594606
}
595607

@@ -701,6 +713,7 @@ export async function executeToolAndReport(
701713
result: { cancelled: true },
702714
error: 'Request aborted during tool execution',
703715
}).catch(() => {})
716+
reportCancelledTool(toolCall, 'Request aborted during tool execution')
704717
return cancelledCompletion('Request aborted during tool execution')
705718
}
706719
toolCall.status = 'error'

apps/sim/lib/copilot/orchestrator/sse/parser.ts

Lines changed: 38 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -14,42 +14,52 @@ export async function* parseSSEStream(
1414
let buffer = ''
1515

1616
try {
17-
while (true) {
18-
if (abortSignal?.aborted) {
19-
logger.info('SSE stream aborted by signal')
20-
break
21-
}
17+
try {
18+
while (true) {
19+
if (abortSignal?.aborted) {
20+
logger.info('SSE stream aborted by signal')
21+
break
22+
}
2223

23-
const { done, value } = await reader.read()
24-
if (done) break
24+
const { done, value } = await reader.read()
25+
if (done) break
2526

26-
buffer += decoder.decode(value, { stream: true })
27-
const lines = buffer.split('\n')
28-
buffer = lines.pop() || ''
27+
buffer += decoder.decode(value, { stream: true })
28+
const lines = buffer.split('\n')
29+
buffer = lines.pop() || ''
2930

30-
for (const line of lines) {
31-
if (abortSignal?.aborted) {
32-
logger.info('SSE stream aborted mid-chunk (between events)')
33-
return
34-
}
35-
if (!line.trim()) continue
36-
if (!line.startsWith('data: ')) continue
31+
for (const line of lines) {
32+
if (abortSignal?.aborted) {
33+
logger.info('SSE stream aborted mid-chunk (between events)')
34+
return
35+
}
36+
if (!line.trim()) continue
37+
if (!line.startsWith('data: ')) continue
3738

38-
const jsonStr = line.slice(6)
39-
if (jsonStr === '[DONE]') continue
39+
const jsonStr = line.slice(6)
40+
if (jsonStr === '[DONE]') continue
4041

41-
try {
42-
const event = JSON.parse(jsonStr) as SSEEvent
43-
if (event?.type) {
44-
yield event
42+
try {
43+
const event = JSON.parse(jsonStr) as SSEEvent
44+
if (event?.type) {
45+
yield event
46+
}
47+
} catch (error) {
48+
logger.warn('Failed to parse SSE event', {
49+
preview: jsonStr.slice(0, 200),
50+
error: error instanceof Error ? error.message : String(error),
51+
})
4552
}
46-
} catch (error) {
47-
logger.warn('Failed to parse SSE event', {
48-
preview: jsonStr.slice(0, 200),
49-
error: error instanceof Error ? error.message : String(error),
50-
})
5153
}
5254
}
55+
} catch (error) {
56+
const aborted =
57+
abortSignal?.aborted || (error instanceof DOMException && error.name === 'AbortError')
58+
if (aborted) {
59+
logger.info('SSE stream read aborted')
60+
return
61+
}
62+
throw error
5363
}
5464

5565
if (buffer.trim() && buffer.startsWith('data: ')) {

apps/sim/lib/copilot/orchestrator/stream/core.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,7 @@ export async function runStreamLoop(
147147
for await (const event of parseSSEStream(reader, decoder, abortSignal)) {
148148
if (abortSignal?.aborted) {
149149
context.wasAborted = true
150+
await reader.cancel().catch(() => {})
150151
break
151152
}
152153

@@ -227,6 +228,10 @@ export async function runStreamLoop(
227228
if (context.streamComplete) break
228229
}
229230
} finally {
231+
if (abortSignal?.aborted) {
232+
context.wasAborted = true
233+
await reader.cancel().catch(() => {})
234+
}
230235
clearTimeout(timeoutId)
231236
}
232237
}

0 commit comments

Comments
 (0)