Skip to content

Commit

Permalink
Merge pull request #196 from kaleido-io/sub-inline-methods
Browse files Browse the repository at this point in the history
Allow passing an array of ABI methods that might emit an event, when creating a sub
  • Loading branch information
peterbroadhurst authored Feb 2, 2022
2 parents 93324e5 + 9b55733 commit f6c7fe5
Show file tree
Hide file tree
Showing 5 changed files with 168 additions and 24 deletions.
3 changes: 3 additions & 0 deletions internal/events/logprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type logEntry struct {
Timestamp uint64 `json:"timestamp,omitempty"`
InputMethod string `json:"inputMethod,omitempty"`
InputArgs map[string]interface{} `json:"inputArgs,omitempty"`
InputSigner string `json:"inputSigner,omitempty"`
}

type eventData struct {
Expand All @@ -51,6 +52,7 @@ type eventData struct {
Timestamp string `json:"timestamp,omitempty"`
InputMethod string `json:"inputMethod,omitempty"`
InputArgs map[string]interface{} `json:"inputArgs,omitempty"`
InputSigner string `json:"inputSigner,omitempty"`
// Used for callback handling
batchComplete func(*eventData)
}
Expand Down Expand Up @@ -129,6 +131,7 @@ func (lp *logProcessor) processLogEntry(subInfo string, entry *logEntry, idx int
LogIndex: strconv.Itoa(idx),
InputMethod: entry.InputMethod,
InputArgs: entry.InputArgs,
InputSigner: entry.InputSigner,
batchComplete: lp.batchComplete,
}
if lp.stream.spec.Timestamps {
Expand Down
18 changes: 15 additions & 3 deletions internal/events/submanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,13 @@ func (s *subscriptionMGR) setInitialBlock(i *SubscriptionInfo, initialBlock stri

// AddSubscription adds a new subscription
func (s *subscriptionMGR) AddSubscription(ctx context.Context, addr *ethbinding.Address, abi *contractregistry.ABILocation, event *ethbinding.ABIElementMarshaling, streamID, initialBlock, name string) (*SubscriptionInfo, error) {
return s.addSubscriptionCommon(ctx, abi, &SubscriptionCreateDTO{
var abiRef *ABIRefOrInline
if abi != nil {
abiRef = &ABIRefOrInline{
ABILocation: *abi,
}
}
return s.addSubscriptionCommon(ctx, abiRef, &SubscriptionCreateDTO{
Address: addr,
Name: name,
Event: event,
Expand All @@ -169,10 +175,16 @@ func (s *subscriptionMGR) AddSubscription(ctx context.Context, addr *ethbinding.
}

func (s *subscriptionMGR) AddSubscriptionDirect(ctx context.Context, newSub *SubscriptionCreateDTO) (*SubscriptionInfo, error) {
return s.addSubscriptionCommon(ctx, nil, newSub)
var abiLocation *ABIRefOrInline
if newSub.Methods != nil {
abiLocation = &ABIRefOrInline{
Inline: newSub.Methods,
}
}
return s.addSubscriptionCommon(ctx, abiLocation, newSub)
}

func (s *subscriptionMGR) addSubscriptionCommon(ctx context.Context, abi *contractregistry.ABILocation, newSub *SubscriptionCreateDTO) (*SubscriptionInfo, error) {
func (s *subscriptionMGR) addSubscriptionCommon(ctx context.Context, abi *ABIRefOrInline, newSub *SubscriptionCreateDTO) (*SubscriptionInfo, error) {
i := &SubscriptionInfo{
Name: newSub.Name,
TimeSorted: messages.TimeSorted{
Expand Down
45 changes: 45 additions & 0 deletions internal/events/submanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,51 @@ func TestStreamAndSubscriptionErrors(t *testing.T) {
sm.Close(true)
}

func TestStreamAndSubscriptionInlineMethodArray(t *testing.T) {
assert := assert.New(t)
dir := tempdir(t)
subscriptionName := "testSub"
defer cleanup(t, dir)
sm := newTestSubscriptionManager()

blockCall := make(chan struct{})
rpc := &ethmocks.RPCClient{}
rpc.On("CallContext", mock.Anything, mock.Anything, mock.Anything).Run(func(args mock.Arguments) { <-blockCall }).Return(nil)
sm.rpc = rpc

sm.db, _ = kvstore.NewLDBKeyValueStore(path.Join(dir, "db"))
defer sm.db.Close()

ctx := context.Background()
assert.Equal([]*SubscriptionInfo{}, sm.Subscriptions(ctx))
assert.Equal([]*StreamInfo{}, sm.Streams(ctx))

stream, err := sm.AddStream(ctx, &StreamInfo{
Type: "webhook",
Webhook: &webhookActionInfo{URL: "http://test.invalid"},
})
assert.NoError(err)

sub, err := sm.AddSubscriptionDirect(ctx, &SubscriptionCreateDTO{
Name: subscriptionName,
Stream: stream.ID,
Event: &ethbinding.ABIElementMarshaling{Name: "ping"},
Methods: ethbinding.ABIMarshaling{
{
Type: "function",
Name: "doPing",
},
},
})
assert.NoError(err)

assert.NotNil(sub.ABI.Inline)
assert.Equal("doPing", sub.ABI.Inline[0].Name)

close(blockCall)
sm.Close(true)
}

func TestResetSubscriptionErrors(t *testing.T) {
assert := assert.New(t)
dir := tempdir(t)
Expand Down
27 changes: 21 additions & 6 deletions internal/events/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,16 @@ type SubscriptionCreateDTO struct {
Name string `json:"name,omitempty"`
Stream string `json:"stream,omitempty"`
Event *ethbinding.ABIElementMarshaling `json:"event,omitempty"`
Methods ethbinding.ABIMarshaling `json:"methods,omitempty"` // an inline set of methods that might emit the event
FromBlock string `json:"fromBlock,omitempty"`
Address *ethbinding.Address `json:"address,omitempty"`
}

type ABIRefOrInline struct {
contractregistry.ABILocation
Inline ethbinding.ABIMarshaling `json:"inline,omitempty"`
}

// SubscriptionInfo is the persisted data for the subscription
type SubscriptionInfo struct {
messages.TimeSorted
Expand All @@ -61,7 +67,7 @@ type SubscriptionInfo struct {
Filter persistedFilter `json:"filter"`
Event *ethbinding.ABIElementMarshaling `json:"event"`
FromBlock string `json:"fromBlock,omitempty"`
ABI *contractregistry.ABILocation `json:"abi,omitempty"`
ABI *ABIRefOrInline `json:"abi,omitempty"`
}

// subscription is the runtime that manages the subscription
Expand Down Expand Up @@ -126,15 +132,21 @@ func (info *SubscriptionInfo) GetID() string {
return info.ID
}

func loadABI(cr contractregistry.ContractResolver, location *contractregistry.ABILocation) (abi *ethbinding.RuntimeABI, err error) {
func loadABI(cr contractregistry.ContractResolver, location *ABIRefOrInline) (abi *ethbinding.RuntimeABI, err error) {
if location == nil {
return nil, nil
}
deployMsg, err := cr.GetABI(*location, false)
if err != nil || deployMsg == nil || deployMsg.Contract == nil {
return nil, err
var abiMarshalling ethbinding.ABIMarshaling
if location.Inline != nil {
abiMarshalling = location.Inline
} else {
deployMsg, err := cr.GetABI(location.ABILocation, false)
if err != nil || deployMsg == nil || deployMsg.Contract == nil {
return nil, err
}
abiMarshalling = deployMsg.Contract.ABI
}
return ethbind.API.ABIMarshalingToABIRuntime(deployMsg.Contract.ABI)
return ethbind.API.ABIMarshalingToABIRuntime(abiMarshalling)
}

func restoreSubscription(sm subscriptionManager, rpc eth.RPCClient, cr contractregistry.ContractResolver, i *SubscriptionInfo) (*subscription, error) {
Expand Down Expand Up @@ -273,6 +285,9 @@ func (s *subscription) getTransactionInputs(ctx context.Context, l *logEntry) {
log.Infof("%s: error querying transaction info", s.logName)
return
}
if info.From != nil {
l.InputSigner = info.From.String()
}
method, err := abi.MethodById(*info.Input)
if err != nil {
log.Infof("%s: could not find matching method", s.logName)
Expand Down
99 changes: 84 additions & 15 deletions internal/events/subscription_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,9 +377,11 @@ func TestGetTransactionInputsLoadABIFail(t *testing.T) {

s := &subscription{
info: &SubscriptionInfo{
ABI: &contractregistry.ABILocation{
ABIType: contractregistry.LocalABI,
Name: "abi1",
ABI: &ABIRefOrInline{
ABILocation: contractregistry.ABILocation{
ABIType: contractregistry.LocalABI,
Name: "abi1",
},
},
},
rpc: rpc,
Expand Down Expand Up @@ -407,9 +409,11 @@ func TestGetTransactionInputsMissingABI(t *testing.T) {

s := &subscription{
info: &SubscriptionInfo{
ABI: &contractregistry.ABILocation{
ABIType: contractregistry.LocalABI,
Name: "abi1",
ABI: &ABIRefOrInline{
ABILocation: contractregistry.ABILocation{
ABIType: contractregistry.LocalABI,
Name: "abi1",
},
},
},
rpc: rpc,
Expand Down Expand Up @@ -441,9 +445,11 @@ func TestGetTransactionInputsTxnInfoFail(t *testing.T) {

s := &subscription{
info: &SubscriptionInfo{
ABI: &contractregistry.ABILocation{
ABIType: contractregistry.LocalABI,
Name: "abi1",
ABI: &ABIRefOrInline{
ABILocation: contractregistry.ABILocation{
ABIType: contractregistry.LocalABI,
Name: "abi1",
},
},
},
rpc: rpc,
Expand Down Expand Up @@ -481,16 +487,20 @@ func TestGetTransactionInputsBadMethod(t *testing.T) {

s := &subscription{
info: &SubscriptionInfo{
ABI: &contractregistry.ABILocation{
ABIType: contractregistry.LocalABI,
Name: "abi1",
ABI: &ABIRefOrInline{
ABILocation: contractregistry.ABILocation{
ABIType: contractregistry.LocalABI,
Name: "abi1",
},
},
},
rpc: rpc,
cr: cr,
}
fromAddr := ethbind.API.HexToAddress("0x0123456789AbcdeF0123456789abCdef01234567")
l := logEntry{
TransactionHash: [32]byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1},
InputSigner: fromAddr.String(),
}
lCopy := l

Expand All @@ -499,6 +509,7 @@ func TestGetTransactionInputsBadMethod(t *testing.T) {
res := args[1]
*(res.(*eth.TxnInfo)) = eth.TxnInfo{
Input: &ethbinding.HexBytes{},
From: &fromAddr,
}
}).
Return(nil)
Expand Down Expand Up @@ -546,9 +557,67 @@ func TestGetTransactionInputsSuccess(t *testing.T) {

s := &subscription{
info: &SubscriptionInfo{
ABI: &contractregistry.ABILocation{
ABIType: contractregistry.LocalABI,
Name: "abi1",
ABI: &ABIRefOrInline{
ABILocation: contractregistry.ABILocation{
ABIType: contractregistry.LocalABI,
Name: "abi1",
},
},
},
rpc: rpc,
cr: cr,
}
l := logEntry{
TransactionHash: [32]byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1},
}
lCopy := l

rpc.On("CallContext", mock.Anything, mock.Anything, "eth_getTransactionByHash", "0x0000000000000000000000000000000000000000000000000000000000000001").
Run(func(args mock.Arguments) {
res := args[1]
*(res.(*eth.TxnInfo)) = eth.TxnInfo{
Input: methodInput,
}
}).
Return(nil)

s.getTransactionInputs(context.Background(), &l)

result, err := json.Marshal(l)
assert.NoError(err)
lCopy.InputMethod = "method1"
lCopy.InputArgs = expectedArgs
defaultLogEntry, err := json.Marshal(lCopy)
assert.NoError(err)
assert.Equal(string(defaultLogEntry), string(result))
}

func TestGetTransactionInputsSuccessInline(t *testing.T) {
assert := assert.New(t)
rpc := &ethmocks.RPCClient{}
cr := &contractregistrymocks.ContractStore{}

methodInput := &ethbinding.HexBytes{
0xf4, 0xe1, 0x3d, 0xc5, // ID of method1
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, // "1" as int32
}
expectedArgs := map[string]interface{}{"arg1": "1"}

s := &subscription{
info: &SubscriptionInfo{
ABI: &ABIRefOrInline{
Inline: ethbinding.ABIMarshaling{
{
Type: "function",
Name: "method1",
Inputs: []ethbinding.ABIArgumentMarshaling{
{
Name: "arg1",
Type: "int32",
},
},
},
},
},
},
rpc: rpc,
Expand Down

0 comments on commit f6c7fe5

Please sign in to comment.