diff --git a/internal/events/logprocessor.go b/internal/events/logprocessor.go index d90a079e..8f80d511 100644 --- a/internal/events/logprocessor.go +++ b/internal/events/logprocessor.go @@ -37,6 +37,7 @@ type logEntry struct { Timestamp uint64 `json:"timestamp,omitempty"` InputMethod string `json:"inputMethod,omitempty"` InputArgs map[string]interface{} `json:"inputArgs,omitempty"` + InputSigner string `json:"inputSigner,omitempty"` } type eventData struct { @@ -51,6 +52,7 @@ type eventData struct { Timestamp string `json:"timestamp,omitempty"` InputMethod string `json:"inputMethod,omitempty"` InputArgs map[string]interface{} `json:"inputArgs,omitempty"` + InputSigner string `json:"inputSigner,omitempty"` // Used for callback handling batchComplete func(*eventData) } @@ -129,6 +131,7 @@ func (lp *logProcessor) processLogEntry(subInfo string, entry *logEntry, idx int LogIndex: strconv.Itoa(idx), InputMethod: entry.InputMethod, InputArgs: entry.InputArgs, + InputSigner: entry.InputSigner, batchComplete: lp.batchComplete, } if lp.stream.spec.Timestamps { diff --git a/internal/events/submanager.go b/internal/events/submanager.go index b20255f9..3fd095b5 100644 --- a/internal/events/submanager.go +++ b/internal/events/submanager.go @@ -159,7 +159,13 @@ func (s *subscriptionMGR) setInitialBlock(i *SubscriptionInfo, initialBlock stri // AddSubscription adds a new subscription func (s *subscriptionMGR) AddSubscription(ctx context.Context, addr *ethbinding.Address, abi *contractregistry.ABILocation, event *ethbinding.ABIElementMarshaling, streamID, initialBlock, name string) (*SubscriptionInfo, error) { - return s.addSubscriptionCommon(ctx, abi, &SubscriptionCreateDTO{ + var abiRef *ABIRefOrInline + if abi != nil { + abiRef = &ABIRefOrInline{ + ABILocation: *abi, + } + } + return s.addSubscriptionCommon(ctx, abiRef, &SubscriptionCreateDTO{ Address: addr, Name: name, Event: event, @@ -169,10 +175,16 @@ func (s *subscriptionMGR) AddSubscription(ctx context.Context, addr *ethbinding. } func (s *subscriptionMGR) AddSubscriptionDirect(ctx context.Context, newSub *SubscriptionCreateDTO) (*SubscriptionInfo, error) { - return s.addSubscriptionCommon(ctx, nil, newSub) + var abiLocation *ABIRefOrInline + if newSub.Methods != nil { + abiLocation = &ABIRefOrInline{ + Inline: newSub.Methods, + } + } + return s.addSubscriptionCommon(ctx, abiLocation, newSub) } -func (s *subscriptionMGR) addSubscriptionCommon(ctx context.Context, abi *contractregistry.ABILocation, newSub *SubscriptionCreateDTO) (*SubscriptionInfo, error) { +func (s *subscriptionMGR) addSubscriptionCommon(ctx context.Context, abi *ABIRefOrInline, newSub *SubscriptionCreateDTO) (*SubscriptionInfo, error) { i := &SubscriptionInfo{ Name: newSub.Name, TimeSorted: messages.TimeSorted{ diff --git a/internal/events/submanager_test.go b/internal/events/submanager_test.go index 435909b4..e4893be4 100644 --- a/internal/events/submanager_test.go +++ b/internal/events/submanager_test.go @@ -277,6 +277,51 @@ func TestStreamAndSubscriptionErrors(t *testing.T) { sm.Close(true) } +func TestStreamAndSubscriptionInlineMethodArray(t *testing.T) { + assert := assert.New(t) + dir := tempdir(t) + subscriptionName := "testSub" + defer cleanup(t, dir) + sm := newTestSubscriptionManager() + + blockCall := make(chan struct{}) + rpc := ðmocks.RPCClient{} + rpc.On("CallContext", mock.Anything, mock.Anything, mock.Anything).Run(func(args mock.Arguments) { <-blockCall }).Return(nil) + sm.rpc = rpc + + sm.db, _ = kvstore.NewLDBKeyValueStore(path.Join(dir, "db")) + defer sm.db.Close() + + ctx := context.Background() + assert.Equal([]*SubscriptionInfo{}, sm.Subscriptions(ctx)) + assert.Equal([]*StreamInfo{}, sm.Streams(ctx)) + + stream, err := sm.AddStream(ctx, &StreamInfo{ + Type: "webhook", + Webhook: &webhookActionInfo{URL: "http://test.invalid"}, + }) + assert.NoError(err) + + sub, err := sm.AddSubscriptionDirect(ctx, &SubscriptionCreateDTO{ + Name: subscriptionName, + Stream: stream.ID, + Event: ðbinding.ABIElementMarshaling{Name: "ping"}, + Methods: ethbinding.ABIMarshaling{ + { + Type: "function", + Name: "doPing", + }, + }, + }) + assert.NoError(err) + + assert.NotNil(sub.ABI.Inline) + assert.Equal("doPing", sub.ABI.Inline[0].Name) + + close(blockCall) + sm.Close(true) +} + func TestResetSubscriptionErrors(t *testing.T) { assert := assert.New(t) dir := tempdir(t) diff --git a/internal/events/subscription.go b/internal/events/subscription.go index 158a2a27..d44581d9 100644 --- a/internal/events/subscription.go +++ b/internal/events/subscription.go @@ -46,10 +46,16 @@ type SubscriptionCreateDTO struct { Name string `json:"name,omitempty"` Stream string `json:"stream,omitempty"` Event *ethbinding.ABIElementMarshaling `json:"event,omitempty"` + Methods ethbinding.ABIMarshaling `json:"methods,omitempty"` // an inline set of methods that might emit the event FromBlock string `json:"fromBlock,omitempty"` Address *ethbinding.Address `json:"address,omitempty"` } +type ABIRefOrInline struct { + contractregistry.ABILocation + Inline ethbinding.ABIMarshaling `json:"inline,omitempty"` +} + // SubscriptionInfo is the persisted data for the subscription type SubscriptionInfo struct { messages.TimeSorted @@ -61,7 +67,7 @@ type SubscriptionInfo struct { Filter persistedFilter `json:"filter"` Event *ethbinding.ABIElementMarshaling `json:"event"` FromBlock string `json:"fromBlock,omitempty"` - ABI *contractregistry.ABILocation `json:"abi,omitempty"` + ABI *ABIRefOrInline `json:"abi,omitempty"` } // subscription is the runtime that manages the subscription @@ -126,15 +132,21 @@ func (info *SubscriptionInfo) GetID() string { return info.ID } -func loadABI(cr contractregistry.ContractResolver, location *contractregistry.ABILocation) (abi *ethbinding.RuntimeABI, err error) { +func loadABI(cr contractregistry.ContractResolver, location *ABIRefOrInline) (abi *ethbinding.RuntimeABI, err error) { if location == nil { return nil, nil } - deployMsg, err := cr.GetABI(*location, false) - if err != nil || deployMsg == nil || deployMsg.Contract == nil { - return nil, err + var abiMarshalling ethbinding.ABIMarshaling + if location.Inline != nil { + abiMarshalling = location.Inline + } else { + deployMsg, err := cr.GetABI(location.ABILocation, false) + if err != nil || deployMsg == nil || deployMsg.Contract == nil { + return nil, err + } + abiMarshalling = deployMsg.Contract.ABI } - return ethbind.API.ABIMarshalingToABIRuntime(deployMsg.Contract.ABI) + return ethbind.API.ABIMarshalingToABIRuntime(abiMarshalling) } func restoreSubscription(sm subscriptionManager, rpc eth.RPCClient, cr contractregistry.ContractResolver, i *SubscriptionInfo) (*subscription, error) { @@ -273,6 +285,9 @@ func (s *subscription) getTransactionInputs(ctx context.Context, l *logEntry) { log.Infof("%s: error querying transaction info", s.logName) return } + if info.From != nil { + l.InputSigner = info.From.String() + } method, err := abi.MethodById(*info.Input) if err != nil { log.Infof("%s: could not find matching method", s.logName) diff --git a/internal/events/subscription_test.go b/internal/events/subscription_test.go index 0e464632..7c1e539b 100644 --- a/internal/events/subscription_test.go +++ b/internal/events/subscription_test.go @@ -377,9 +377,11 @@ func TestGetTransactionInputsLoadABIFail(t *testing.T) { s := &subscription{ info: &SubscriptionInfo{ - ABI: &contractregistry.ABILocation{ - ABIType: contractregistry.LocalABI, - Name: "abi1", + ABI: &ABIRefOrInline{ + ABILocation: contractregistry.ABILocation{ + ABIType: contractregistry.LocalABI, + Name: "abi1", + }, }, }, rpc: rpc, @@ -407,9 +409,11 @@ func TestGetTransactionInputsMissingABI(t *testing.T) { s := &subscription{ info: &SubscriptionInfo{ - ABI: &contractregistry.ABILocation{ - ABIType: contractregistry.LocalABI, - Name: "abi1", + ABI: &ABIRefOrInline{ + ABILocation: contractregistry.ABILocation{ + ABIType: contractregistry.LocalABI, + Name: "abi1", + }, }, }, rpc: rpc, @@ -441,9 +445,11 @@ func TestGetTransactionInputsTxnInfoFail(t *testing.T) { s := &subscription{ info: &SubscriptionInfo{ - ABI: &contractregistry.ABILocation{ - ABIType: contractregistry.LocalABI, - Name: "abi1", + ABI: &ABIRefOrInline{ + ABILocation: contractregistry.ABILocation{ + ABIType: contractregistry.LocalABI, + Name: "abi1", + }, }, }, rpc: rpc, @@ -481,16 +487,20 @@ func TestGetTransactionInputsBadMethod(t *testing.T) { s := &subscription{ info: &SubscriptionInfo{ - ABI: &contractregistry.ABILocation{ - ABIType: contractregistry.LocalABI, - Name: "abi1", + ABI: &ABIRefOrInline{ + ABILocation: contractregistry.ABILocation{ + ABIType: contractregistry.LocalABI, + Name: "abi1", + }, }, }, rpc: rpc, cr: cr, } + fromAddr := ethbind.API.HexToAddress("0x0123456789AbcdeF0123456789abCdef01234567") l := logEntry{ TransactionHash: [32]byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1}, + InputSigner: fromAddr.String(), } lCopy := l @@ -499,6 +509,7 @@ func TestGetTransactionInputsBadMethod(t *testing.T) { res := args[1] *(res.(*eth.TxnInfo)) = eth.TxnInfo{ Input: ðbinding.HexBytes{}, + From: &fromAddr, } }). Return(nil) @@ -546,9 +557,67 @@ func TestGetTransactionInputsSuccess(t *testing.T) { s := &subscription{ info: &SubscriptionInfo{ - ABI: &contractregistry.ABILocation{ - ABIType: contractregistry.LocalABI, - Name: "abi1", + ABI: &ABIRefOrInline{ + ABILocation: contractregistry.ABILocation{ + ABIType: contractregistry.LocalABI, + Name: "abi1", + }, + }, + }, + rpc: rpc, + cr: cr, + } + l := logEntry{ + TransactionHash: [32]byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1}, + } + lCopy := l + + rpc.On("CallContext", mock.Anything, mock.Anything, "eth_getTransactionByHash", "0x0000000000000000000000000000000000000000000000000000000000000001"). + Run(func(args mock.Arguments) { + res := args[1] + *(res.(*eth.TxnInfo)) = eth.TxnInfo{ + Input: methodInput, + } + }). + Return(nil) + + s.getTransactionInputs(context.Background(), &l) + + result, err := json.Marshal(l) + assert.NoError(err) + lCopy.InputMethod = "method1" + lCopy.InputArgs = expectedArgs + defaultLogEntry, err := json.Marshal(lCopy) + assert.NoError(err) + assert.Equal(string(defaultLogEntry), string(result)) +} + +func TestGetTransactionInputsSuccessInline(t *testing.T) { + assert := assert.New(t) + rpc := ðmocks.RPCClient{} + cr := &contractregistrymocks.ContractStore{} + + methodInput := ðbinding.HexBytes{ + 0xf4, 0xe1, 0x3d, 0xc5, // ID of method1 + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, // "1" as int32 + } + expectedArgs := map[string]interface{}{"arg1": "1"} + + s := &subscription{ + info: &SubscriptionInfo{ + ABI: &ABIRefOrInline{ + Inline: ethbinding.ABIMarshaling{ + { + Type: "function", + Name: "method1", + Inputs: []ethbinding.ABIArgumentMarshaling{ + { + Name: "arg1", + Type: "int32", + }, + }, + }, + }, }, }, rpc: rpc,