Skip to content

Commit 5f7217d

Browse files
will-dzchrisli30
authored andcommitted
fix: updated graphQL node implementation (#365)
* Updated graphQL node implementation * Fixed execution output of deployed workflow for ContractWRite * Make sure metadata uses original address but not tokenContract * Fixed variables shared across iteration in parallel execution from LoopNode
1 parent ab8029b commit 5f7217d

9 files changed

+473
-112
lines changed

core/taskengine/run_node_immediately.go

Lines changed: 74 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,14 @@ func (n *Engine) runBlockTriggerImmediately(triggerConfig map[string]interface{}
7272

7373
// Check if a specific block number is requested
7474
if configBlockNumber, ok := triggerConfig["blockNumber"]; ok {
75-
if blockNum, err := n.parseUint64(configBlockNumber); err == nil {
75+
blockNum, err := n.parseUint64(configBlockNumber)
76+
if err != nil {
77+
if n.logger != nil {
78+
n.logger.Debug("Failed to parse blockNumber from trigger config, using latest block",
79+
"blockNumber", configBlockNumber,
80+
"error", err)
81+
}
82+
} else {
7683
blockNumber = blockNum
7784
}
7885
}
@@ -351,7 +358,7 @@ func (n *Engine) runEventTriggerWithTenderlySimulation(ctx context.Context, quer
351358
}
352359

353360
metadata := map[string]interface{}{
354-
"tokenContract": simulatedLog.Address.Hex(), // Renamed for clarity
361+
"address": simulatedLog.Address.Hex(), // Original contract address
355362
"topics": topicsMetadata, // Now protobuf-compatible
356363
"data": "0x" + common.Bytes2Hex(simulatedLog.Data),
357364
"blockNumber": simulatedLog.BlockNumber,
@@ -1080,7 +1087,7 @@ func (n *Engine) runEventTriggerWithHistoricalSearch(ctx context.Context, querie
10801087

10811088
// Build raw metadata (the original blockchain event data)
10821089
metadata := map[string]interface{}{
1083-
"tokenContract": mostRecentEvent.Address.Hex(), // Renamed for clarity
1090+
"address": mostRecentEvent.Address.Hex(), // Original contract address
10841091
"topics": topics,
10851092
"data": "0x" + common.Bytes2Hex(mostRecentEvent.Data),
10861093
"blockNumber": mostRecentEvent.BlockNumber,
@@ -1501,7 +1508,7 @@ func (n *Engine) runProcessingNodeWithInputs(nodeType string, nodeConfig map[str
15011508
secrets = make(map[string]string)
15021509
}
15031510

1504-
// Create a clean VM for isolated execution with proper secrets
1511+
// Create a clean VM for isolated execution with proper secrets (no task needed for immediate execution)
15051512
vm, err := NewVMWithData(nil, nil, n.smartWalletConfig, secrets)
15061513
if err != nil {
15071514
return nil, fmt.Errorf("failed to create VM: %w", err)
@@ -1679,16 +1686,32 @@ func (n *Engine) extractExecutionResult(executionStep *avsproto.Execution_Step)
16791686
result["success"] = true
16801687
} else if contractWrite := executionStep.GetContractWrite(); contractWrite != nil {
16811688
// ContractWrite output now contains enhanced results structure
1689+
// Data contains decoded events (flattened by method name), Metadata contains method results
1690+
1691+
// Extract data if available (decoded events organized by method name)
16821692
if contractWrite.GetData() != nil {
1683-
// Extract results using helper function
1684-
allResults := ExtractResultsFromProtobufValue(contractWrite.GetData())
1693+
iface := contractWrite.GetData().AsInterface()
1694+
if m, ok := iface.(map[string]interface{}); ok {
1695+
result["data"] = m
1696+
} else {
1697+
result["data"] = iface
1698+
}
1699+
}
16851700

1686-
// Return results array directly without backward compatibility
1701+
// Extract metadata if available (method results array)
1702+
if contractWrite.GetMetadata() != nil {
1703+
// Extract results from metadata (method results array)
1704+
allResults := ExtractResultsFromProtobufValue(contractWrite.GetMetadata())
16871705
result["results"] = allResults
16881706

1689-
return result, nil
1707+
if metadataArray := gow.ValueToSlice(contractWrite.GetMetadata()); metadataArray != nil {
1708+
result["metadata"] = metadataArray
1709+
} else {
1710+
result["metadata"] = contractWrite.GetMetadata().AsInterface()
1711+
}
16901712
}
1691-
return map[string]interface{}{"status": "success"}, nil
1713+
1714+
return result, nil
16921715
} else if loop := executionStep.GetLoop(); loop != nil {
16931716
// Loop output contains the array of iteration results
16941717
if loop.GetData() != nil {
@@ -1709,6 +1732,19 @@ func (n *Engine) extractExecutionResult(executionStep *avsproto.Execution_Step)
17091732
result["data"] = iface
17101733
}
17111734
}
1735+
} else if graphql := executionStep.GetGraphql(); graphql != nil {
1736+
// GraphQL output contains the query results
1737+
if graphql.GetData() != nil {
1738+
// Extract the actual GraphQL response data
1739+
iface := graphql.GetData().AsInterface()
1740+
// Return the GraphQL data directly (it should already be in the correct format)
1741+
if graphqlData, ok := iface.(map[string]interface{}); ok {
1742+
return graphqlData, nil
1743+
} else {
1744+
// Fallback: wrap in data field
1745+
result["data"] = iface
1746+
}
1747+
}
17121748
}
17131749

17141750
// If no specific data was extracted, include basic execution info
@@ -1976,10 +2012,15 @@ func (n *Engine) RunNodeImmediatelyRPC(user *model.User, req *avsproto.RunNodeWi
19762012
case NodeTypeContractWrite:
19772013
// For contract write nodes - always set output structure to avoid OUTPUT_DATA_NOT_SET
19782014
contractWriteOutput := &avsproto.ContractWriteNode_Output{}
2015+
// Always create the data structure, even if result is nil/empty
2016+
var resultsArray []interface{}
2017+
var decodedEventsData = make(map[string]interface{})
2018+
19792019
if result != nil && len(result) > 0 {
1980-
// Convert result to the new data structure
1981-
var resultsArray []interface{}
1982-
var decodedEventsData = make(map[string]interface{})
2020+
// First, try to extract data directly (flattened events organized by method name)
2021+
if dataFromVM, ok := result["data"].(map[string]interface{}); ok {
2022+
decodedEventsData = dataFromVM
2023+
}
19832024

19842025
// Check if we have the new results array format (from VM execution)
19852026
if resultsFromVM, ok := result["results"].([]interface{}); ok {
@@ -2012,12 +2053,13 @@ func (n *Engine) RunNodeImmediatelyRPC(user *model.User, req *avsproto.RunNodeWi
20122053

20132054
resultsArray = append(resultsArray, convertedResult)
20142055

2015-
// 🚀 NEW: Decode event logs from transaction receipt
2056+
// 🚀 NEW: Parse events for this specific method and store by method name
2057+
methodEvents := make(map[string]interface{})
20162058
if methodResult.Receipt != nil {
20172059
receiptData := methodResult.Receipt.AsInterface()
20182060
if receiptMap, ok := receiptData.(map[string]interface{}); ok {
20192061
if logs, hasLogs := receiptMap["logs"]; hasLogs {
2020-
if logsArray, ok := logs.([]interface{}); ok {
2062+
if logsArray, ok := logs.([]interface{}); ok && len(logsArray) > 0 {
20212063
// Get contract ABI for event decoding
20222064
var contractABI *abi.ABI
20232065
if methodResult.MethodAbi != nil {
@@ -2034,7 +2076,7 @@ func (n *Engine) RunNodeImmediatelyRPC(user *model.User, req *avsproto.RunNodeWi
20342076
}
20352077
}
20362078

2037-
// Decode each event log
2079+
// Decode each event log for this method
20382080
for _, logInterface := range logsArray {
20392081
if logMap, ok := logInterface.(map[string]interface{}); ok {
20402082
if contractABI != nil {
@@ -2070,39 +2112,12 @@ func (n *Engine) RunNodeImmediatelyRPC(user *model.User, req *avsproto.RunNodeWi
20702112

20712113
// Decode the event using existing logic
20722114
if decodedEvent, err := n.parseEventWithParsedABI(eventLog, contractABI, nil); err == nil {
2073-
// Add decoded event data to flattened result
2115+
// Flatten event fields into methodEvents
20742116
for key, value := range decodedEvent {
2075-
// Use eventName.fieldName format with dot notation to avoid conflicts
2076-
if eventName, hasEventName := decodedEvent["eventName"]; hasEventName {
2077-
if eventNameStr, ok := eventName.(string); ok && key != "eventName" {
2078-
flatKey := fmt.Sprintf("%s.%s", eventNameStr, key)
2079-
decodedEventsData[flatKey] = value
2080-
}
2117+
if key != "eventName" { // Skip meta field
2118+
methodEvents[key] = value
20812119
}
20822120
}
2083-
2084-
// Also add the complete event object with a descriptive key for multiple events
2085-
var eventKey string
2086-
if eventName, hasEventName := decodedEvent["eventName"]; hasEventName {
2087-
if eventNameStr, ok := eventName.(string); ok {
2088-
// Try to get transaction hash or log index for uniqueness
2089-
var uniqueSuffix string
2090-
if txHash, hasTxHash := logMap["transactionHash"]; hasTxHash {
2091-
if txHashStr, ok := txHash.(string); ok {
2092-
uniqueSuffix = txHashStr
2093-
}
2094-
} else if logIndex, hasLogIndex := logMap["logIndex"]; hasLogIndex {
2095-
uniqueSuffix = fmt.Sprintf("%x", logIndex)
2096-
} else {
2097-
// Fallback to eventLog.Index if logMap doesn't have logIndex
2098-
uniqueSuffix = fmt.Sprintf("%d", eventLog.Index)
2099-
}
2100-
eventKey = fmt.Sprintf("%s_%s", eventNameStr, uniqueSuffix)
2101-
}
2102-
} else {
2103-
eventKey = fmt.Sprintf("event_%d", eventLog.Index)
2104-
}
2105-
decodedEventsData[eventKey] = decodedEvent
21062121
}
21072122
}
21082123
}
@@ -2111,6 +2126,9 @@ func (n *Engine) RunNodeImmediatelyRPC(user *model.User, req *avsproto.RunNodeWi
21112126
}
21122127
}
21132128
}
2129+
2130+
// Store events for this method (empty object if no events)
2131+
decodedEventsData[methodResult.MethodName] = methodEvents
21142132
} else if methodResultMap, ok := resultInterface.(map[string]interface{}); ok {
21152133
// Already in map format
21162134
resultsArray = append(resultsArray, methodResultMap)
@@ -2139,18 +2157,18 @@ func (n *Engine) RunNodeImmediatelyRPC(user *model.User, req *avsproto.RunNodeWi
21392157
}
21402158
}
21412159

2142-
// 🚀 NEW: Set data field with decoded events (flattened format like ContractRead)
2143-
if len(decodedEventsData) > 0 {
2144-
if dataValue, err := structpb.NewValue(decodedEventsData); err == nil {
2145-
contractWriteOutput.Data = dataValue
2146-
}
2147-
}
2160+
}
21482161

2149-
// 🚀 NEW: Set metadata field with detailed method information
2150-
if len(resultsArray) > 0 {
2151-
if metadataValue, err := structpb.NewValue(resultsArray); err == nil {
2152-
contractWriteOutput.Metadata = metadataValue
2153-
}
2162+
// 🚀 NEW: Set data field with decoded events (flattened format like ContractRead)
2163+
// Always set data field - empty object if no events, flattened events if present
2164+
if dataValue, err := structpb.NewValue(decodedEventsData); err == nil {
2165+
contractWriteOutput.Data = dataValue
2166+
}
2167+
2168+
// 🚀 NEW: Set metadata field with detailed method information
2169+
if len(resultsArray) > 0 {
2170+
if metadataValue, err := structpb.NewValue(resultsArray); err == nil {
2171+
contractWriteOutput.Metadata = metadataValue
21542172
}
21552173
}
21562174
resp.OutputData = &avsproto.RunNodeWithInputsResp_ContractWrite{

core/taskengine/shared_event_enrichment.go

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,9 @@ func parseEventWithABIShared(eventLog *types.Log, contractABI *abi.ABI, query *a
138138
parsedData := make(map[string]interface{})
139139
parsedData["eventName"] = eventName
140140

141+
// Create ABI value converter for proper type conversion
142+
converter := NewABIValueConverter(nil, nil)
143+
141144
// Add indexed parameters from topics (skip topic[0] which is event signature)
142145
indexedCount := 0
143146
nonIndexedCount := 0
@@ -151,14 +154,17 @@ func parseEventWithABIShared(eventLog *types.Log, contractABI *abi.ABI, query *a
151154
// Convert hash to address for indexed address parameters
152155
parsedData[input.Name] = common.HexToAddress(eventLog.Topics[topicIndex].Hex()).Hex()
153156
} else {
154-
parsedData[input.Name] = eventLog.Topics[topicIndex]
157+
// Convert common.Hash to hex string for structpb compatibility
158+
parsedData[input.Name] = eventLog.Topics[topicIndex].Hex()
155159
}
156160
}
157161
indexedCount++
158162
} else {
159-
// Get from decoded data
163+
// Get from decoded data and convert to structpb-compatible type
160164
if nonIndexedCount < len(decodedData) {
161-
parsedData[input.Name] = decodedData[nonIndexedCount]
165+
rawValue := decodedData[nonIndexedCount]
166+
// Use ABI converter to ensure structpb compatibility
167+
parsedData[input.Name] = converter.ConvertABIValueToInterface(rawValue, input.Type, input.Name)
162168
}
163169
nonIndexedCount++
164170
}
@@ -368,13 +374,19 @@ func findCommonAddress(fromAddresses, toAddresses []string) string {
368374

369375
// createBasicEventMetadata creates basic metadata structure for events without ABI
370376
func createBasicEventMetadata(eventLog *types.Log) map[string]interface{} {
377+
// Convert topics from []common.Hash to []string for structpb compatibility
378+
topicsHex := make([]string, len(eventLog.Topics))
379+
for i, topic := range eventLog.Topics {
380+
topicsHex[i] = topic.Hex()
381+
}
382+
371383
return map[string]interface{}{
372384
"tokenContract": eventLog.Address.Hex(), // Renamed from "address" for clarity
373385
"blockNumber": eventLog.BlockNumber,
374386
"transactionHash": eventLog.TxHash.Hex(),
375387
"logIndex": eventLog.Index,
376388
"blockHash": eventLog.BlockHash.Hex(),
377-
"topics": eventLog.Topics,
389+
"topics": topicsHex,
378390
"data": common.Bytes2Hex(eventLog.Data),
379391
"removed": eventLog.Removed,
380392
}

core/taskengine/tenderly_client.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ type RPCError struct {
5050
// Call parameters for eth_call
5151
type CallParams struct {
5252
To string `json:"to"`
53+
From string `json:"from,omitempty"`
5354
Data string `json:"data"`
5455
}
5556

@@ -538,16 +539,18 @@ func (tc *TenderlyClient) createMockTransferLog(contractAddress string, from, to
538539
}
539540

540541
// SimulateContractWrite simulates a contract write operation using Tenderly
541-
func (tc *TenderlyClient) SimulateContractWrite(ctx context.Context, contractAddress string, callData string, contractABI string, methodName string, chainID int64) (*ContractWriteSimulationResult, error) {
542+
func (tc *TenderlyClient) SimulateContractWrite(ctx context.Context, contractAddress string, callData string, contractABI string, methodName string, chainID int64, fromAddress string) (*ContractWriteSimulationResult, error) {
542543
tc.logger.Info("🔮 Simulating contract write via Tenderly",
543544
"contract", contractAddress,
544545
"method", methodName,
546+
"from", fromAddress,
545547
"chain_id", chainID)
546548

547549
// For simulation, we use eth_call to see what would happen without actually executing
548550
// This gives us the return data and potential revert reasons
549551
callParams := CallParams{
550552
To: contractAddress,
553+
From: fromAddress,
551554
Data: callData,
552555
}
553556

@@ -612,7 +615,7 @@ func (tc *TenderlyClient) SimulateContractWrite(ctx context.Context, contractAdd
612615
result.Transaction = &ContractWriteTransactionData{
613616
Hash: fmt.Sprintf("0x%064x", time.Now().UnixNano()), // Mock transaction hash
614617
Status: "simulated",
615-
From: "0x0000000000000000000000000000000000000001", // Mock sender
618+
From: fromAddress, // Use the actual user's wallet address
616619
To: contractAddress,
617620
Value: "0",
618621
Timestamp: time.Now().Unix(),

core/taskengine/tenderly_client_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1516,7 +1516,7 @@ func TestEventTriggerImmediately_TenderlySimulation_Unit(t *testing.T) {
15161516
require.True(t, ok, "metadata should be a map[string]interface{}")
15171517
require.NotNil(t, metadata, "Should have metadata")
15181518

1519-
assert.NotNil(t, metadata["tokenContract"], "Should have address in metadata")
1519+
assert.NotNil(t, metadata["address"], "Should have address in metadata")
15201520
assert.NotNil(t, metadata["blockNumber"], "Should have blockNumber in metadata")
15211521

15221522
t.Logf("✅ Tenderly simulation successful!")

0 commit comments

Comments
 (0)