Skip to content

Commit f52de5d

Browse files
authored
Feature/api (#82)
* my test changes for branch protection * feat(api): introduced 'deploy as an API' button and updated workflows db to include status of deployment * feat(api): added 'trigger' column for logs table to indicate source of workflow run, persist logs from API executions, removed session validation in favor of API key * fix(bug): cleanup old reference to JSX element in favor of ReactElement * feat(api): added persistent notification for one-click deployment with copy boxes for url, keys, & ex curl * fix(ui/notifications): cleaned up deploy with one-click button ui
1 parent 48b6095 commit f52de5d

File tree

27 files changed

+2356
-1652
lines changed

27 files changed

+2356
-1652
lines changed

app/api/scheduled/execute/route.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -290,6 +290,7 @@ export async function GET(req: NextRequest) {
290290
log.error || `Completed successfully`
291291
}`,
292292
duration: log.success ? `${log.durationMs}ms` : 'NA',
293+
trigger: 'schedule',
293294
createdAt: new Date(log.endedAt || log.startedAt),
294295
})
295296
}
@@ -309,6 +310,7 @@ export async function GET(req: NextRequest) {
309310
? 'Scheduled workflow executed successfully'
310311
: `Scheduled workflow execution failed: ${result.error}`,
311312
duration: result.success ? `${totalDuration}ms` : 'NA',
313+
trigger: 'schedule',
312314
createdAt: new Date(),
313315
})
314316

@@ -353,6 +355,7 @@ export async function GET(req: NextRequest) {
353355
level: 'error',
354356
message: error.message || 'Unknown error during scheduled workflow execution',
355357
createdAt: new Date(),
358+
trigger: 'schedule',
356359
})
357360

358361
// On error, increment next_run_at by a small delay to prevent immediate retries
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
import { NextRequest } from 'next/server'
2+
import { eq } from 'drizzle-orm'
3+
import { v4 as uuidv4 } from 'uuid'
4+
import { db } from '@/db'
5+
import { workflow } from '@/db/schema'
6+
import { validateWorkflowAccess } from '../../middleware'
7+
import { createErrorResponse, createSuccessResponse } from '../../utils'
8+
9+
export const dynamic = 'force-dynamic'
10+
export const runtime = 'nodejs'
11+
12+
export async function POST(request: NextRequest, { params }: { params: Promise<{ id: string }> }) {
13+
const { id } = await params
14+
15+
try {
16+
const validation = await validateWorkflowAccess(request, id, false)
17+
18+
if (validation.error) {
19+
return createErrorResponse(validation.error.message, validation.error.status)
20+
}
21+
22+
// Generate a new API key
23+
const apiKey = `wf_${uuidv4().replace(/-/g, '')}`
24+
25+
// Update the workflow with the API key and deployment status
26+
await db
27+
.update(workflow)
28+
.set({
29+
apiKey,
30+
isDeployed: true,
31+
deployedAt: new Date(),
32+
})
33+
.where(eq(workflow.id, id))
34+
35+
return createSuccessResponse({ apiKey })
36+
} catch (error: any) {
37+
console.error('Error deploying workflow:', error)
38+
return createErrorResponse(error.message || 'Failed to deploy workflow', 500)
39+
}
40+
}
Lines changed: 214 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,214 @@
1+
import { NextRequest } from 'next/server'
2+
import { eq } from 'drizzle-orm'
3+
import { v4 as uuidv4 } from 'uuid'
4+
import { z } from 'zod'
5+
import { persistLog } from '@/lib/logging'
6+
import { decryptSecret } from '@/lib/utils'
7+
import { WorkflowState } from '@/stores/workflow/types'
8+
import { mergeSubblockState } from '@/stores/workflow/utils'
9+
import { db } from '@/db'
10+
import { environment } from '@/db/schema'
11+
import { Executor } from '@/executor'
12+
import { Serializer } from '@/serializer'
13+
import { validateWorkflowAccess } from '../../middleware'
14+
import { createErrorResponse, createSuccessResponse } from '../../utils'
15+
16+
export const dynamic = 'force-dynamic'
17+
export const runtime = 'nodejs'
18+
19+
// Define the schema for environment variables
20+
const EnvVarsSchema = z.record(z.string())
21+
22+
// Keep track of running executions to prevent overlap
23+
const runningExecutions = new Set<string>()
24+
25+
async function executeWorkflow(workflow: any, input?: any) {
26+
const workflowId = workflow.id
27+
const executionId = uuidv4()
28+
29+
// Skip if this workflow is already running
30+
if (runningExecutions.has(workflowId)) {
31+
throw new Error('Workflow is already running')
32+
}
33+
34+
try {
35+
runningExecutions.add(workflowId)
36+
37+
// Get the workflow state
38+
const state = workflow.state as WorkflowState
39+
const { blocks, edges, loops } = state
40+
41+
// Use the same execution flow as in scheduled executions
42+
const mergedStates = mergeSubblockState(blocks)
43+
44+
// Retrieve environment variables for this user
45+
const [userEnv] = await db
46+
.select()
47+
.from(environment)
48+
.where(eq(environment.userId, workflow.userId))
49+
.limit(1)
50+
51+
if (!userEnv) {
52+
throw new Error('No environment variables found for this user')
53+
}
54+
55+
// Parse and validate environment variables
56+
const variables = EnvVarsSchema.parse(userEnv.variables)
57+
58+
// Replace environment variables in the block states
59+
const currentBlockStates = await Object.entries(mergedStates).reduce(
60+
async (accPromise, [id, block]) => {
61+
const acc = await accPromise
62+
acc[id] = await Object.entries(block.subBlocks).reduce(
63+
async (subAccPromise, [key, subBlock]) => {
64+
const subAcc = await subAccPromise
65+
let value = subBlock.value
66+
67+
// If the value is a string and contains environment variable syntax
68+
if (typeof value === 'string' && value.includes('{{') && value.includes('}}')) {
69+
const matches = value.match(/{{([^}]+)}}/g)
70+
if (matches) {
71+
// Process all matches sequentially
72+
for (const match of matches) {
73+
const varName = match.slice(2, -2) // Remove {{ and }}
74+
const encryptedValue = variables[varName]
75+
if (!encryptedValue) {
76+
throw new Error(`Environment variable "${varName}" was not found`)
77+
}
78+
79+
try {
80+
const { decrypted } = await decryptSecret(encryptedValue)
81+
value = (value as string).replace(match, decrypted)
82+
} catch (error: any) {
83+
console.error('Error decrypting value:', error)
84+
throw new Error(
85+
`Failed to decrypt environment variable "${varName}": ${error.message}`
86+
)
87+
}
88+
}
89+
}
90+
}
91+
92+
subAcc[key] = value
93+
return subAcc
94+
},
95+
Promise.resolve({} as Record<string, any>)
96+
)
97+
return acc
98+
},
99+
Promise.resolve({} as Record<string, Record<string, any>>)
100+
)
101+
102+
// Create a map of decrypted environment variables
103+
const decryptedEnvVars: Record<string, string> = {}
104+
for (const [key, encryptedValue] of Object.entries(variables)) {
105+
try {
106+
const { decrypted } = await decryptSecret(encryptedValue)
107+
decryptedEnvVars[key] = decrypted
108+
} catch (error: any) {
109+
console.error(`Failed to decrypt ${key}:`, error)
110+
throw new Error(`Failed to decrypt environment variable "${key}": ${error.message}`)
111+
}
112+
}
113+
114+
// Serialize and execute the workflow
115+
const serializedWorkflow = new Serializer().serializeWorkflow(mergedStates, edges, loops)
116+
const executor = new Executor(serializedWorkflow, currentBlockStates, decryptedEnvVars)
117+
const result = await executor.execute(workflowId)
118+
119+
// Log each execution step
120+
for (const log of result.logs || []) {
121+
await persistLog({
122+
id: uuidv4(),
123+
workflowId,
124+
executionId,
125+
level: log.success ? 'info' : 'error',
126+
message: `Block ${log.blockName || log.blockId} (${log.blockType}): ${
127+
log.error || 'Completed successfully'
128+
}`,
129+
duration: log.success ? `${log.durationMs}ms` : 'NA',
130+
trigger: 'api',
131+
createdAt: new Date(log.endedAt || log.startedAt),
132+
})
133+
}
134+
135+
// Calculate total duration from successful block logs
136+
const totalDuration = (result.logs || [])
137+
.filter((log) => log.success)
138+
.reduce((sum, log) => sum + log.durationMs, 0)
139+
140+
// Log the final execution result
141+
await persistLog({
142+
id: uuidv4(),
143+
workflowId,
144+
executionId,
145+
level: result.success ? 'info' : 'error',
146+
message: result.success
147+
? 'API workflow executed successfully'
148+
: `API workflow execution failed: ${result.error}`,
149+
duration: result.success ? `${totalDuration}ms` : 'NA',
150+
trigger: 'api',
151+
createdAt: new Date(),
152+
})
153+
154+
return result
155+
} catch (error: any) {
156+
// Log the error
157+
await persistLog({
158+
id: uuidv4(),
159+
workflowId,
160+
executionId,
161+
level: 'error',
162+
message: `API workflow execution failed: ${error.message}`,
163+
duration: 'NA',
164+
trigger: 'api',
165+
createdAt: new Date(),
166+
})
167+
throw error
168+
} finally {
169+
runningExecutions.delete(workflowId)
170+
}
171+
}
172+
173+
export async function GET(request: NextRequest, { params }: { params: Promise<{ id: string }> }) {
174+
const { id } = await params
175+
176+
try {
177+
const validation = await validateWorkflowAccess(request, id)
178+
if (validation.error) {
179+
return createErrorResponse(validation.error.message, validation.error.status)
180+
}
181+
182+
const result = await executeWorkflow(validation.workflow)
183+
return createSuccessResponse(result)
184+
} catch (error: any) {
185+
console.error('Error executing workflow:', error)
186+
return createErrorResponse(
187+
error.message || 'Failed to execute workflow',
188+
500,
189+
'EXECUTION_ERROR'
190+
)
191+
}
192+
}
193+
194+
export async function POST(request: NextRequest, { params }: { params: Promise<{ id: string }> }) {
195+
const { id } = await params
196+
197+
try {
198+
const validation = await validateWorkflowAccess(request, id)
199+
if (validation.error) {
200+
return createErrorResponse(validation.error.message, validation.error.status)
201+
}
202+
203+
const body = await request.json().catch(() => ({}))
204+
const result = await executeWorkflow(validation.workflow, body)
205+
return createSuccessResponse(result)
206+
} catch (error: any) {
207+
console.error('Error executing workflow:', error)
208+
return createErrorResponse(
209+
error.message || 'Failed to execute workflow',
210+
500,
211+
'EXECUTION_ERROR'
212+
)
213+
}
214+
}

app/api/workflow/[id]/route.ts

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
import { NextRequest } from 'next/server'
2+
import { Executor } from '@/executor'
3+
import { SerializedWorkflow } from '@/serializer/types'
4+
import { validateWorkflowAccess } from '../middleware'
5+
import { createErrorResponse, createSuccessResponse } from '../utils'
6+
7+
export const dynamic = 'force-dynamic'
8+
9+
async function executeWorkflow(workflow: any, input?: any) {
10+
try {
11+
const executor = new Executor(workflow.state as SerializedWorkflow, input)
12+
const result = await executor.execute(workflow.id)
13+
return result
14+
} catch (error: any) {
15+
console.error('Workflow execution failed:', { workflowId: workflow.id, error })
16+
throw new Error(`Execution failed: ${error.message}`)
17+
}
18+
}
19+
20+
export async function GET(request: NextRequest, { params }: { params: Promise<{ id: string }> }) {
21+
try {
22+
const { id } = await params
23+
24+
const validation = await validateWorkflowAccess(request, id)
25+
26+
if (validation.error) {
27+
return createErrorResponse(validation.error.message, validation.error.status)
28+
}
29+
30+
const result = await executeWorkflow(validation.workflow)
31+
return createSuccessResponse(result)
32+
} catch (error: any) {
33+
console.error('Error executing workflow:', error)
34+
return createErrorResponse('Failed to execute workflow', 500, 'EXECUTION_ERROR')
35+
}
36+
}
37+
38+
export async function POST(request: NextRequest, { params }: { params: Promise<{ id: string }> }) {
39+
try {
40+
const { id } = await params
41+
const validation = await validateWorkflowAccess(request, id)
42+
43+
if (validation.error) {
44+
return createErrorResponse(validation.error.message, validation.error.status)
45+
}
46+
47+
const body = await request.json().catch(() => ({}))
48+
const result = await executeWorkflow(validation.workflow, body)
49+
return createSuccessResponse(result)
50+
} catch (error: any) {
51+
console.error('Error executing workflow:', error)
52+
return createErrorResponse('Failed to execute workflow', 500, 'EXECUTION_ERROR')
53+
}
54+
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
import { NextRequest } from 'next/server'
2+
import { validateWorkflowAccess } from '../../middleware'
3+
import { createErrorResponse, createSuccessResponse } from '../../utils'
4+
5+
export async function GET(request: NextRequest, { params }: { params: Promise<{ id: string }> }) {
6+
try {
7+
const { id } = await params
8+
const validation = await validateWorkflowAccess(request, id, false)
9+
if (validation.error) {
10+
return createErrorResponse(validation.error.message, validation.error.status)
11+
}
12+
13+
return createSuccessResponse({
14+
isDeployed: validation.workflow.isDeployed,
15+
deployedAt: validation.workflow.deployedAt,
16+
})
17+
} catch (error) {
18+
return createErrorResponse('Failed to get status', 500)
19+
}
20+
}

0 commit comments

Comments
 (0)