Skip to content

Commit 7ee6ee4

Browse files
committed
DB backed tools
1 parent 715dfdb commit 7ee6ee4

File tree

17 files changed

+15855
-29
lines changed

17 files changed

+15855
-29
lines changed

apps/sim/app/api/copilot/chat/route.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -317,10 +317,14 @@ export async function POST(req: NextRequest) {
317317
}
318318

319319
if (stream) {
320+
const executionId = crypto.randomUUID()
321+
const runId = crypto.randomUUID()
320322
const sseStream = createSSEStream({
321323
requestPayload,
322324
userId: authenticatedUserId,
323325
streamId: userMessageIdToUse,
326+
executionId,
327+
runId,
324328
chatId: actualChatId,
325329
currentChat,
326330
isNewChat: conversationHistory.length === 0,
@@ -333,6 +337,8 @@ export async function POST(req: NextRequest) {
333337
userId: authenticatedUserId,
334338
workflowId,
335339
chatId: actualChatId,
340+
executionId,
341+
runId,
336342
goRoute: '/api/copilot',
337343
autoExecuteTools: true,
338344
interactive: true,

apps/sim/app/api/copilot/chat/stream/route.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,8 @@ export async function GET(request: NextRequest) {
6969
success: true,
7070
events: filteredEvents,
7171
status: meta.status,
72+
executionId: meta.executionId,
73+
runId: meta.runId,
7274
})
7375
}
7476

@@ -77,6 +79,7 @@ export async function GET(request: NextRequest) {
7779
const stream = new ReadableStream({
7880
async start(controller) {
7981
let lastEventId = Number.isFinite(fromEventId) ? fromEventId : 0
82+
let latestMeta = meta
8083

8184
const flushEvents = async () => {
8285
const events = await readStreamEvents(streamId, lastEventId)
@@ -93,6 +96,8 @@ export async function GET(request: NextRequest) {
9396
...entry.event,
9497
eventId: entry.eventId,
9598
streamId: entry.streamId,
99+
executionId: latestMeta?.executionId,
100+
runId: latestMeta?.runId,
96101
}
97102
controller.enqueue(encodeEvent(payload))
98103
}
@@ -104,6 +109,7 @@ export async function GET(request: NextRequest) {
104109
while (Date.now() - startTime < MAX_STREAM_MS) {
105110
const currentMeta = await getStreamMeta(streamId)
106111
if (!currentMeta) break
112+
latestMeta = currentMeta
107113

108114
await flushEvents()
109115

apps/sim/app/api/copilot/confirm/route.ts

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { createLogger } from '@sim/logger'
22
import { type NextRequest, NextResponse } from 'next/server'
33
import { z } from 'zod'
4+
import { completeAsyncToolCall, upsertAsyncToolCall } from '@/lib/copilot/async-runs/repository'
45
import { REDIS_TOOL_CALL_PREFIX, REDIS_TOOL_CALL_TTL_SECONDS } from '@/lib/copilot/constants'
56
import {
67
authenticateCopilotRequestSessionOnly,
@@ -34,10 +35,34 @@ async function updateToolCallStatus(
3435
message?: string,
3536
data?: Record<string, unknown>
3637
): Promise<boolean> {
38+
const durableStatus =
39+
status === 'success'
40+
? 'completed'
41+
: status === 'cancelled'
42+
? 'cancelled'
43+
: status === 'error' || status === 'rejected'
44+
? 'failed'
45+
: 'pending'
46+
await upsertAsyncToolCall({
47+
runId: crypto.randomUUID(),
48+
toolCallId,
49+
toolName: 'client_tool',
50+
args: {},
51+
status: durableStatus,
52+
}).catch(() => {})
53+
if (durableStatus === 'completed' || durableStatus === 'failed' || durableStatus === 'cancelled') {
54+
await completeAsyncToolCall({
55+
toolCallId,
56+
status: durableStatus,
57+
result: data ?? null,
58+
error: status === 'success' ? null : message || status,
59+
}).catch(() => {})
60+
}
61+
3762
const redis = getRedisClient()
3863
if (!redis) {
39-
logger.warn('Redis client not available for tool confirmation')
40-
return false
64+
logger.warn('Redis client not available for tool confirmation; durable DB mirror only')
65+
return true
4166
}
4267

4368
try {

apps/sim/app/api/mothership/chat/route.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -256,10 +256,14 @@ export async function POST(req: NextRequest) {
256256
await waitForPendingChatStream(actualChatId)
257257
}
258258

259+
const executionId = crypto.randomUUID()
260+
const runId = crypto.randomUUID()
259261
const stream = createSSEStream({
260262
requestPayload,
261263
userId: authenticatedUserId,
262264
streamId: userMessageId,
265+
executionId,
266+
runId,
263267
chatId: actualChatId,
264268
currentChat,
265269
isNewChat: conversationHistory.length === 0,
@@ -271,6 +275,8 @@ export async function POST(req: NextRequest) {
271275
userId: authenticatedUserId,
272276
workspaceId,
273277
chatId: actualChatId,
278+
executionId,
279+
runId,
274280
goRoute: '/api/mothership',
275281
autoExecuteTools: true,
276282
interactive: true,
Lines changed: 253 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,253 @@
1+
import { db } from '@sim/db'
2+
import {
3+
copilotAsyncToolCalls,
4+
copilotRunCheckpoints,
5+
copilotRuns,
6+
type CopilotAsyncToolStatus,
7+
type CopilotRunStatus,
8+
} from '@sim/db/schema'
9+
import { createLogger } from '@sim/logger'
10+
import { and, desc, eq, inArray, isNull } from 'drizzle-orm'
11+
12+
const logger = createLogger('CopilotAsyncRunsRepo')
13+
14+
export interface CreateRunSegmentInput {
15+
id?: string
16+
executionId: string
17+
parentRunId?: string | null
18+
chatId: string
19+
userId: string
20+
workflowId?: string | null
21+
workspaceId?: string | null
22+
streamId: string
23+
agent?: string | null
24+
model?: string | null
25+
provider?: string | null
26+
requestContext?: Record<string, unknown>
27+
status?: CopilotRunStatus
28+
}
29+
30+
export async function createRunSegment(input: CreateRunSegmentInput) {
31+
const [run] = await db
32+
.insert(copilotRuns)
33+
.values({
34+
...(input.id ? { id: input.id } : {}),
35+
executionId: input.executionId,
36+
parentRunId: input.parentRunId ?? null,
37+
chatId: input.chatId,
38+
userId: input.userId,
39+
workflowId: input.workflowId ?? null,
40+
workspaceId: input.workspaceId ?? null,
41+
streamId: input.streamId,
42+
agent: input.agent ?? null,
43+
model: input.model ?? null,
44+
provider: input.provider ?? null,
45+
requestContext: input.requestContext ?? {},
46+
status: input.status ?? 'active',
47+
})
48+
.returning()
49+
50+
return run
51+
}
52+
53+
export async function updateRunStatus(
54+
runId: string,
55+
status: CopilotRunStatus,
56+
updates: {
57+
completedAt?: Date | null
58+
error?: string | null
59+
requestContext?: Record<string, unknown>
60+
} = {}
61+
) {
62+
const [run] = await db
63+
.update(copilotRuns)
64+
.set({
65+
status,
66+
completedAt: updates.completedAt,
67+
error: updates.error,
68+
requestContext: updates.requestContext,
69+
updatedAt: new Date(),
70+
})
71+
.where(eq(copilotRuns.id, runId))
72+
.returning()
73+
74+
return run ?? null
75+
}
76+
77+
export async function getLatestRunForExecution(executionId: string) {
78+
const [run] = await db
79+
.select()
80+
.from(copilotRuns)
81+
.where(eq(copilotRuns.executionId, executionId))
82+
.orderBy(desc(copilotRuns.startedAt))
83+
.limit(1)
84+
85+
return run ?? null
86+
}
87+
88+
export async function createRunCheckpoint(input: {
89+
runId: string
90+
pendingToolCallId: string
91+
conversationSnapshot: Record<string, unknown>
92+
agentState: Record<string, unknown>
93+
providerRequest: Record<string, unknown>
94+
}) {
95+
const [checkpoint] = await db
96+
.insert(copilotRunCheckpoints)
97+
.values({
98+
runId: input.runId,
99+
pendingToolCallId: input.pendingToolCallId,
100+
conversationSnapshot: input.conversationSnapshot,
101+
agentState: input.agentState,
102+
providerRequest: input.providerRequest,
103+
})
104+
.returning()
105+
106+
return checkpoint
107+
}
108+
109+
export async function upsertAsyncToolCall(input: {
110+
runId: string
111+
checkpointId?: string | null
112+
toolCallId: string
113+
toolName: string
114+
args?: Record<string, unknown>
115+
status?: CopilotAsyncToolStatus
116+
}) {
117+
const now = new Date()
118+
const [row] = await db
119+
.insert(copilotAsyncToolCalls)
120+
.values({
121+
runId: input.runId,
122+
checkpointId: input.checkpointId ?? null,
123+
toolCallId: input.toolCallId,
124+
toolName: input.toolName,
125+
args: input.args ?? {},
126+
status: input.status ?? 'pending',
127+
updatedAt: now,
128+
})
129+
.onConflictDoUpdate({
130+
target: copilotAsyncToolCalls.toolCallId,
131+
set: {
132+
runId: input.runId,
133+
checkpointId: input.checkpointId ?? null,
134+
toolName: input.toolName,
135+
args: input.args ?? {},
136+
status: input.status ?? 'pending',
137+
updatedAt: now,
138+
},
139+
})
140+
.returning()
141+
142+
return row
143+
}
144+
145+
export async function markAsyncToolStatus(
146+
toolCallId: string,
147+
status: CopilotAsyncToolStatus,
148+
updates: {
149+
claimedBy?: string | null
150+
result?: Record<string, unknown> | null
151+
error?: string | null
152+
completedAt?: Date | null
153+
} = {}
154+
) {
155+
const claimedAt =
156+
status === 'running' && updates.claimedBy ? (updates.completedAt ? undefined : new Date()) : undefined
157+
158+
const [row] = await db
159+
.update(copilotAsyncToolCalls)
160+
.set({
161+
status,
162+
claimedBy: updates.claimedBy,
163+
claimedAt,
164+
result: updates.result,
165+
error: updates.error,
166+
completedAt: updates.completedAt,
167+
updatedAt: new Date(),
168+
})
169+
.where(eq(copilotAsyncToolCalls.toolCallId, toolCallId))
170+
.returning()
171+
172+
return row ?? null
173+
}
174+
175+
export async function markAsyncToolRunning(toolCallId: string, claimedBy: string) {
176+
return markAsyncToolStatus(toolCallId, 'running', { claimedBy })
177+
}
178+
179+
export async function completeAsyncToolCall(input: {
180+
toolCallId: string
181+
status: Extract<CopilotAsyncToolStatus, 'completed' | 'failed' | 'cancelled'>
182+
result?: Record<string, unknown> | null
183+
error?: string | null
184+
}) {
185+
const [existing] = await db
186+
.select()
187+
.from(copilotAsyncToolCalls)
188+
.where(eq(copilotAsyncToolCalls.toolCallId, input.toolCallId))
189+
.limit(1)
190+
191+
if (!existing) {
192+
logger.warn('completeAsyncToolCall called before pending row existed', {
193+
toolCallId: input.toolCallId,
194+
status: input.status,
195+
})
196+
return null
197+
}
198+
199+
if (existing.status === 'completed' || existing.status === 'failed' || existing.status === 'cancelled') {
200+
return existing
201+
}
202+
203+
return markAsyncToolStatus(input.toolCallId, input.status, {
204+
result: input.result ?? null,
205+
error: input.error ?? null,
206+
completedAt: new Date(),
207+
})
208+
}
209+
210+
export async function enqueueAsyncToolResume(toolCallId: string) {
211+
return markAsyncToolStatus(toolCallId, 'resume_enqueued')
212+
}
213+
214+
export async function markAsyncToolResumed(toolCallId: string) {
215+
return markAsyncToolStatus(toolCallId, 'resumed')
216+
}
217+
218+
export async function listAsyncToolCallsForRun(runId: string) {
219+
return db
220+
.select()
221+
.from(copilotAsyncToolCalls)
222+
.where(eq(copilotAsyncToolCalls.runId, runId))
223+
.orderBy(desc(copilotAsyncToolCalls.createdAt))
224+
}
225+
226+
export async function getAsyncToolCalls(toolCallIds: string[]) {
227+
if (toolCallIds.length === 0) return []
228+
return db
229+
.select()
230+
.from(copilotAsyncToolCalls)
231+
.where(inArray(copilotAsyncToolCalls.toolCallId, toolCallIds))
232+
}
233+
234+
export async function claimCompletedAsyncToolCall(toolCallId: string, workerId: string) {
235+
const [row] = await db
236+
.update(copilotAsyncToolCalls)
237+
.set({
238+
status: 'resume_enqueued',
239+
claimedBy: workerId,
240+
claimedAt: new Date(),
241+
updatedAt: new Date(),
242+
})
243+
.where(
244+
and(
245+
eq(copilotAsyncToolCalls.toolCallId, toolCallId),
246+
inArray(copilotAsyncToolCalls.status, ['completed', 'failed', 'cancelled']),
247+
isNull(copilotAsyncToolCalls.claimedBy)
248+
)
249+
)
250+
.returning()
251+
252+
return row ?? null
253+
}

0 commit comments

Comments
 (0)