Skip to content

Commit

Permalink
If FilterLogs rpc return error, retry same fromBlk in next ticker (#59)
Browse files Browse the repository at this point in the history
* fix FilterLog rpc err case and add test
* fix merge lost code
  • Loading branch information
i9 authored Apr 20, 2022
1 parent 6f0dc05 commit 58faf21
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 11 deletions.
34 changes: 30 additions & 4 deletions eth/mon2/mon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package mon2

import (
"context"
"errors"
"math/big"
"testing"
"time"
Expand Down Expand Up @@ -36,7 +37,7 @@ func TestDoOneQuery(t *testing.T) {
ec.addLog(50, 2)
ec.addLog(50, 10)
ec.addLog(60, 1)
gotLogs := m.doOneQuery(q, saved)
gotLogs, _ := m.doOneQuery(q, saved)
chkEq(len(gotLogs), 1, t)
chkEq(gotLogs[0].BlockNumber, uint64(60), t)
chkEq(gotLogs[0].Index, uint(1), t)
Expand Down Expand Up @@ -129,7 +130,6 @@ func TestFilterQuery(t *testing.T) {
BlkIntv: time.Minute, // we'll manually call updateBlkNum
BlkDelay: 5,
})
defer m.Close()
chkIntv := time.Millisecond // increase this if test err on slow/busy machine
go m.MonAddr(PerAddrCfg{
ChkIntv: chkIntv,
Expand All @@ -144,12 +144,33 @@ func TestFilterQuery(t *testing.T) {

// enough time to ensure MonAddr ticker triggers several times
time.Sleep(time.Duration(len(ec.expFrom)+1) * chkIntv)
// make sure expFrom/expTo are empty now, meaning FilterLogs has been calld
// make sure expFrom/expTo are empty now, meaning FilterLogs has been called
chkEq(len(ec.expFrom), 0, t)
chkEq(len(ec.expTo), 0, t)
// check dal has correct fromblk/index, as last query has no logs, it should be CalcNextFromBlkNum
chkEq(dal[zeroKey].BlkNum, uint64(95), t)
chkEq(dal[zeroKey].Index, int64(-1), t)
m.Close() // done w/ first test case

// new monitor for FilterLog err case
ec.blkNum = 200
ec.flErr = errors.New("some FilterLog rpc error")
ec.expFrom = append(ec.expFrom, 95, 95)
ec.expTo = append(ec.expTo, 195, 195)
m, _ = NewMonitor(ec, dal, PerChainCfg{
BlkIntv: time.Minute, // we'll manually call updateBlkNum
BlkDelay: 5,
})
go m.MonAddr(PerAddrCfg{
ChkIntv: chkIntv,
}, func(string, types.Log) {})
time.Sleep(time.Duration(len(ec.expFrom)+1) * chkIntv)
chkEq(len(ec.expFrom), 0, t)
chkEq(len(ec.expTo), 0, t)
// because FilterLog return err, no update in db
chkEq(dal[zeroKey].BlkNum, uint64(95), t)
chkEq(dal[zeroKey].Index, int64(-1), t)
m.Close() // done w/ first test case
}

// mock eth client
Expand All @@ -159,8 +180,10 @@ type MockEc struct {
// when FilterLogs is called, expected value for q.FromBlock and q.ToBlock
// will be popped in each call
expFrom, expTo []uint64
// logs to be returned in next FilterLogs call
// logs to be returned in next FilterLogs call (if flErr is nill)
logs []types.Log
// FilterLogs api err, if set non-nil, FilterLogs will return (nil, flErr) directly
flErr error
}

func (ec *MockEc) ChainID(ctx context.Context) (*big.Int, error) {
Expand All @@ -187,6 +210,9 @@ func (ec *MockEc) chkFromTo(qfrom, qto uint64) {

func (ec *MockEc) FilterLogs(ctx context.Context, q ethereum.FilterQuery) ([]types.Log, error) {
ec.chkFromTo(q.FromBlock.Uint64(), q.ToBlock.Uint64())
if ec.flErr != nil {
return nil, ec.flErr
}
var ret []types.Log
keep := ec.logs[:0] // share same backing array and capacity as ec.logs so can modify it in-place
for _, elog := range ec.logs {
Expand Down
16 changes: 10 additions & 6 deletions eth/mon2/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,10 @@ func (m *Monitor) MonAddr(cfg PerAddrCfg, cbfn EventCallback) {
}
q.ToBlock = toBigInt(toBlk)
// call m.ec.FilterLogs and skip already logs before savedLogID, if savedLogID is nil, return all received logs
todoLogs := m.doOneQuery(q, savedLogID)
todoLogs, err := m.doOneQuery(q, savedLogID)
if err != nil {
continue // keep same fromBlk and try again in next ticker, next to may be different
}
// now go over todoLogs and call callback func
// it's possible all have been skipped so we don't do anything
for _, elog := range todoLogs {
Expand Down Expand Up @@ -118,19 +121,20 @@ func (m *Monitor) initFromInQ(q *ethereum.FilterQuery, key string) *LogEventID {
return nil
}

// calls FilterLogs and skip already processed
func (m *Monitor) doOneQuery(q *ethereum.FilterQuery, savedLogID *LogEventID) []types.Log {
// calls FilterLogs and skip already processed log blk/idx, if FilterLogs returns non-nil err, return nil logs and err directly
func (m *Monitor) doOneQuery(q *ethereum.FilterQuery, savedLogID *LogEventID) ([]types.Log, error) {
logs, err := m.ec.FilterLogs(context.TODO(), *q)
if err != nil {
log.Warnf("%d-%x getLogs fromBlk %s toBlk %s failed. err: %s", m.chainId, q.Addresses[0], q.FromBlock, q.ToBlock, err)
log.Warnf("%d-%x getLogs fromBlk %s toBlk %s failed. err: %v", m.chainId, q.Addresses[0], q.FromBlock, q.ToBlock, err)
return nil, err
}
if len(logs) == 0 {
return logs
return logs, nil
}
// if resume from db and on first ticker, as fromblock is same as db, we may get same events again
// how many logs should be skipped, only could be non-zero if savedLogID isn't nil
// if savedLogID is nil, return 0 directly
return logs[savedLogID.CountSkip(logs):]
return logs[savedLogID.CountSkip(logs):], nil
}

// parse abi and return map from event.ID to its name eg. Deposited
Expand Down
2 changes: 1 addition & 1 deletion eth/mon2/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func (m *Monitor) updateBlkNum() {
// don't lock upfront in case rpc takes long time
blkNum, err := m.ec.BlockNumber(context.Background())
if err != nil {
log.Warnf("chain %d get blknum err: %s", m.chainId, err)
log.Warnf("chain %d get blknum err: %v", m.chainId, err)
// todo: switch to backup ec
return
}
Expand Down

0 comments on commit 58faf21

Please sign in to comment.