Skip to content

Commit dde1356

Browse files
fix: harden batch add block enrichment fallback
1 parent a3a9918 commit dde1356

File tree

2 files changed

+167
-21
lines changed

2 files changed

+167
-21
lines changed

apps/sim/socket/database/operations.test.ts

Lines changed: 143 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,18 +2,39 @@
22
* @vitest-environment node
33
*/
44

5-
import { describe, expect, it, vi } from 'vitest'
5+
import { beforeEach, describe, expect, it, vi } from 'vitest'
66
import {
77
filterBatchAddBlocksByProtection,
88
getBatchUpdateParentCoMovingKey,
9+
persistWorkflowOperation,
910
} from '@/socket/database/operations'
1011

12+
const {
13+
mockTransaction,
14+
mockGetActiveWorkflowContext,
15+
mockMergeSubBlockValues,
16+
mockGetBlock,
17+
mockWorkflowTable,
18+
mockWorkflowBlocksTable,
19+
mockWorkflowEdgesTable,
20+
mockWorkflowSubflowsTable,
21+
} = vi.hoisted(() => ({
22+
mockTransaction: vi.fn(),
23+
mockGetActiveWorkflowContext: vi.fn(),
24+
mockMergeSubBlockValues: vi.fn(),
25+
mockGetBlock: vi.fn(),
26+
mockWorkflowTable: {},
27+
mockWorkflowBlocksTable: {},
28+
mockWorkflowEdgesTable: {},
29+
mockWorkflowSubflowsTable: {},
30+
}))
31+
1132
vi.mock('@sim/db', () => ({
1233
webhook: {},
13-
workflow: {},
14-
workflowBlocks: {},
15-
workflowEdges: {},
16-
workflowSubflows: {},
34+
workflow: mockWorkflowTable,
35+
workflowBlocks: mockWorkflowBlocksTable,
36+
workflowEdges: mockWorkflowEdgesTable,
37+
workflowSubflows: mockWorkflowSubflowsTable,
1738
}))
1839

1940
vi.mock('@sim/logger', () => ({
@@ -35,7 +56,7 @@ vi.mock('drizzle-orm', () => ({
3556
}))
3657

3758
vi.mock('drizzle-orm/postgres-js', () => ({
38-
drizzle: vi.fn(() => ({ transaction: vi.fn() })),
59+
drizzle: vi.fn(() => ({ transaction: mockTransaction })),
3960
}))
4061

4162
vi.mock('postgres', () => ({
@@ -59,26 +80,36 @@ vi.mock('@/lib/webhooks/provider-subscriptions', () => ({
5980
}))
6081

6182
vi.mock('@/lib/workflows/active-context', () => ({
62-
getActiveWorkflowContext: vi.fn(),
83+
getActiveWorkflowContext: mockGetActiveWorkflowContext,
6384
}))
6485

6586
vi.mock('@/lib/workflows/persistence/utils', () => ({
6687
loadWorkflowFromNormalizedTables: vi.fn(),
6788
}))
6889

6990
vi.mock('@/lib/workflows/subblocks', () => ({
70-
mergeSubBlockValues: vi.fn(),
91+
mergeSubBlockValues: mockMergeSubBlockValues,
7192
}))
7293

7394
vi.mock('@/blocks/registry', () => ({
74-
getBlock: vi.fn(),
95+
getBlock: mockGetBlock,
7596
}))
7697

7798
vi.mock('@/triggers', () => ({
7899
getTrigger: vi.fn(),
79100
isTriggerValid: vi.fn(() => false),
80101
}))
81102

103+
beforeEach(() => {
104+
vi.clearAllMocks()
105+
mockGetActiveWorkflowContext.mockResolvedValue({ id: 'workflow-1' })
106+
mockMergeSubBlockValues.mockImplementation((subBlocks, values) => ({
107+
...((subBlocks as Record<string, unknown>) ?? {}),
108+
...((values as Record<string, unknown>) ?? {}),
109+
}))
110+
mockGetBlock.mockReturnValue(undefined)
111+
})
112+
82113
describe('getBatchUpdateParentCoMovingKey', () => {
83114
it('groups blocks entering the same destination container together', () => {
84115
expect(getBatchUpdateParentCoMovingKey('old-parent-a', 'loop-1')).toBe(
@@ -161,3 +192,106 @@ describe('filterBatchAddBlocksByProtection', () => {
161192
expect(allowedBlocks.map((block) => block.id)).toEqual(['new-container', 'new-child'])
162193
})
163194
})
195+
196+
describe('persistWorkflowOperation', () => {
197+
it('defensively enriches batch-added blocks before persistence and broadcast payloads', async () => {
198+
const insertedBlockValues: Array<Record<string, unknown>> = []
199+
200+
const tx = {
201+
update: vi.fn(() => ({
202+
set: vi.fn(() => ({ where: vi.fn().mockResolvedValue(undefined) })),
203+
})),
204+
select: vi.fn(() => ({
205+
from: vi.fn(() => ({
206+
where: vi.fn().mockResolvedValue([]),
207+
})),
208+
})),
209+
insert: vi.fn((table) => {
210+
if (table === mockWorkflowBlocksTable) {
211+
return {
212+
values: vi.fn((values) => {
213+
insertedBlockValues.push(...(values as Array<Record<string, unknown>>))
214+
return {
215+
onConflictDoUpdate: vi.fn().mockResolvedValue(undefined),
216+
}
217+
}),
218+
}
219+
}
220+
221+
return {
222+
values: vi.fn(() => ({
223+
onConflictDoUpdate: vi.fn().mockResolvedValue(undefined),
224+
})),
225+
}
226+
}),
227+
}
228+
229+
mockTransaction.mockImplementation(async (callback) => callback(tx))
230+
mockGetBlock.mockReturnValue({
231+
subBlocks: [
232+
{ id: 'prompt', type: 'short-input', defaultValue: 'registry-default' },
233+
{ id: 'temperature', type: 'slider', defaultValue: 0.7 },
234+
],
235+
outputs: {
236+
result: { type: 'string' },
237+
},
238+
})
239+
240+
const result = await persistWorkflowOperation('workflow-1', {
241+
operation: 'batch-add-blocks',
242+
target: 'blocks',
243+
payload: {
244+
blocks: [
245+
{
246+
id: 'block-1',
247+
type: 'agent',
248+
name: 'Agent',
249+
position: { x: 10, y: 20 },
250+
data: {},
251+
},
252+
],
253+
edges: [],
254+
loops: {},
255+
parallels: {},
256+
subBlockValues: {
257+
'block-1': {
258+
prompt: { value: 'user value' },
259+
},
260+
},
261+
},
262+
timestamp: Date.now(),
263+
userId: 'user-1',
264+
})
265+
266+
expect(insertedBlockValues).toHaveLength(1)
267+
expect(insertedBlockValues[0]).toMatchObject({
268+
id: 'block-1',
269+
subBlocks: {
270+
prompt: { value: 'user value' },
271+
temperature: { id: 'temperature', type: 'slider', value: 0.7 },
272+
},
273+
outputs: {
274+
result: { type: 'string' },
275+
},
276+
})
277+
expect(result.appliedPayload).toMatchObject({
278+
blocks: [
279+
{
280+
id: 'block-1',
281+
subBlocks: {
282+
prompt: { value: 'user value' },
283+
temperature: { id: 'temperature', type: 'slider', value: 0.7 },
284+
},
285+
outputs: {
286+
result: { type: 'string' },
287+
},
288+
},
289+
],
290+
subBlockValues: {
291+
'block-1': {
292+
prompt: { value: 'user value' },
293+
},
294+
},
295+
})
296+
})
297+
})

apps/sim/socket/database/operations.ts

Lines changed: 24 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1130,6 +1130,9 @@ async function handleBlocksOperationTx(
11301130
parallelCount: Object.keys(parallels || {}).length,
11311131
})
11321132

1133+
let allowedSubBlockValues: Record<string, unknown> = {}
1134+
let enrichedAllowedBlocks: Array<Record<string, unknown>> = []
1135+
11331136
if (blocks && blocks.length > 0) {
11341137
// Fetch existing blocks to check for locked parents
11351138
const existingBlocks = await tx
@@ -1167,9 +1170,23 @@ async function handleBlocksOperationTx(
11671170
}
11681171
}
11691172

1170-
// Blocks are pre-enriched by enrichBatchAddBlocksPayload() upstream,
1171-
// so subBlocks, outputs, and triggerMode are already populated.
1172-
const blockValues = allowedBlocks.map((block: Record<string, unknown>) => {
1173+
const allowedBlockIds = new Set(allowedBlocks.map((block) => block.id as string))
1174+
allowedSubBlockValues = Object.fromEntries(
1175+
Object.entries((subBlockValues as Record<string, unknown> | undefined) ?? {}).filter(
1176+
([id]) => allowedBlockIds.has(id)
1177+
)
1178+
)
1179+
1180+
const enrichedAllowedPayload = enrichBatchAddBlocksPayload({
1181+
...payload,
1182+
blocks: allowedBlocks,
1183+
subBlockValues: allowedSubBlockValues,
1184+
})
1185+
1186+
enrichedAllowedBlocks =
1187+
(enrichedAllowedPayload.blocks as Array<Record<string, unknown>> | undefined) ?? []
1188+
1189+
const blockValues = enrichedAllowedBlocks.map((block: Record<string, unknown>) => {
11731190
return {
11741191
id: block.id as string,
11751192
workflowId,
@@ -1215,7 +1232,7 @@ async function handleBlocksOperationTx(
12151232
// Create subflow entries for loop/parallel blocks (skip if already in payload)
12161233
const loopIds = new Set(loops ? Object.keys(loops) : [])
12171234
const parallelIds = new Set(parallels ? Object.keys(parallels) : [])
1218-
for (const block of allowedBlocks) {
1235+
for (const block of enrichedAllowedBlocks) {
12191236
const blockId = block.id as string
12201237
if (block.type === 'loop' && !loopIds.has(blockId)) {
12211238
await tx
@@ -1262,7 +1279,7 @@ async function handleBlocksOperationTx(
12621279

12631280
// Update parent subflow node lists
12641281
const parentIds = new Set<string>()
1265-
for (const block of allowedBlocks) {
1282+
for (const block of enrichedAllowedBlocks) {
12661283
const parentId = (block.data as Record<string, unknown>)?.parentId as string | undefined
12671284
if (parentId) {
12681285
parentIds.add(parentId)
@@ -1273,7 +1290,7 @@ async function handleBlocksOperationTx(
12731290
}
12741291
}
12751292

1276-
const allowedBlockIds = new Set(allowedBlocks.map((block) => block.id as string))
1293+
const allowedBlockIds = new Set(enrichedAllowedBlocks.map((block) => block.id as string))
12771294
const validEdgeEndpointIds = new Set([...existingBlockIds, ...allowedBlockIds])
12781295
const allowedEdges = ((edges as Array<Record<string, unknown>> | undefined) ?? []).filter(
12791296
(edge) => {
@@ -1297,11 +1314,6 @@ async function handleBlocksOperationTx(
12971314
allowedBlockIds.has(id)
12981315
)
12991316
)
1300-
const allowedSubBlockValues = Object.fromEntries(
1301-
Object.entries((subBlockValues as Record<string, unknown> | undefined) ?? {}).filter(
1302-
([id]) => allowedBlockIds.has(id)
1303-
)
1304-
)
13051317

13061318
if (allowedEdges.length > 0) {
13071319
const edgeValues = allowedEdges.map((edge: Record<string, unknown>) => ({
@@ -1372,7 +1384,7 @@ async function handleBlocksOperationTx(
13721384
return {
13731385
appliedPayload: {
13741386
...payload,
1375-
blocks: allowedBlocks,
1387+
blocks: enrichedAllowedBlocks,
13761388
edges: allowedEdges,
13771389
loops: allowedLoops,
13781390
parallels: allowedParallels,

0 commit comments

Comments
 (0)