From 58faf2155c1ddb32633aa1a9713c59fe6c4a6447 Mon Sep 17 00:00:00 2001
From: i9 <i9@users.noreply.github.com>
Date: Tue, 19 Apr 2022 22:52:08 -0700
Subject: [PATCH] If FilterLogs rpc return error, retry same fromBlk in next
 ticker (#59)

* fix FilterLog rpc err case and add test
* fix merge lost code
---
 eth/mon2/mon_test.go | 34 ++++++++++++++++++++++++++++++----
 eth/mon2/monitor.go  | 16 ++++++++++------
 eth/mon2/types.go    |  2 +-
 3 files changed, 41 insertions(+), 11 deletions(-)

diff --git a/eth/mon2/mon_test.go b/eth/mon2/mon_test.go
index 02b92fe..fd36a70 100644
--- a/eth/mon2/mon_test.go
+++ b/eth/mon2/mon_test.go
@@ -2,6 +2,7 @@ package mon2
 
 import (
 	"context"
+	"errors"
 	"math/big"
 	"testing"
 	"time"
@@ -36,7 +37,7 @@ func TestDoOneQuery(t *testing.T) {
 	ec.addLog(50, 2)
 	ec.addLog(50, 10)
 	ec.addLog(60, 1)
-	gotLogs := m.doOneQuery(q, saved)
+	gotLogs, _ := m.doOneQuery(q, saved)
 	chkEq(len(gotLogs), 1, t)
 	chkEq(gotLogs[0].BlockNumber, uint64(60), t)
 	chkEq(gotLogs[0].Index, uint(1), t)
@@ -129,7 +130,6 @@ func TestFilterQuery(t *testing.T) {
 		BlkIntv:  time.Minute, // we'll manually call updateBlkNum
 		BlkDelay: 5,
 	})
-	defer m.Close()
 	chkIntv := time.Millisecond // increase this if test err on slow/busy machine
 	go m.MonAddr(PerAddrCfg{
 		ChkIntv: chkIntv,
@@ -144,12 +144,33 @@ func TestFilterQuery(t *testing.T) {
 
 	// enough time to ensure MonAddr ticker triggers several times
 	time.Sleep(time.Duration(len(ec.expFrom)+1) * chkIntv)
-	// make sure expFrom/expTo are empty now, meaning FilterLogs has been calld
+	// make sure expFrom/expTo are empty now, meaning FilterLogs has been called
 	chkEq(len(ec.expFrom), 0, t)
 	chkEq(len(ec.expTo), 0, t)
 	// check dal has correct fromblk/index, as last query has no logs, it should be CalcNextFromBlkNum
 	chkEq(dal[zeroKey].BlkNum, uint64(95), t)
 	chkEq(dal[zeroKey].Index, int64(-1), t)
+	m.Close() // done w/ first test case
+
+	// new monitor for FilterLog err case
+	ec.blkNum = 200
+	ec.flErr = errors.New("some FilterLog rpc error")
+	ec.expFrom = append(ec.expFrom, 95, 95)
+	ec.expTo = append(ec.expTo, 195, 195)
+	m, _ = NewMonitor(ec, dal, PerChainCfg{
+		BlkIntv:  time.Minute, // we'll manually call updateBlkNum
+		BlkDelay: 5,
+	})
+	go m.MonAddr(PerAddrCfg{
+		ChkIntv: chkIntv,
+	}, func(string, types.Log) {})
+	time.Sleep(time.Duration(len(ec.expFrom)+1) * chkIntv)
+	chkEq(len(ec.expFrom), 0, t)
+	chkEq(len(ec.expTo), 0, t)
+	// because FilterLog return err, no update in db
+	chkEq(dal[zeroKey].BlkNum, uint64(95), t)
+	chkEq(dal[zeroKey].Index, int64(-1), t)
+	m.Close() // done w/ first test case
 }
 
 // mock eth client
@@ -159,8 +180,10 @@ type MockEc struct {
 	// when FilterLogs is called, expected value for q.FromBlock and q.ToBlock
 	// will be popped in each call
 	expFrom, expTo []uint64
-	// logs to be returned in next FilterLogs call
+	// logs to be returned in next FilterLogs call (if flErr is nill)
 	logs []types.Log
+	// FilterLogs api err, if set non-nil, FilterLogs will return (nil, flErr) directly
+	flErr error
 }
 
 func (ec *MockEc) ChainID(ctx context.Context) (*big.Int, error) {
@@ -187,6 +210,9 @@ func (ec *MockEc) chkFromTo(qfrom, qto uint64) {
 
 func (ec *MockEc) FilterLogs(ctx context.Context, q ethereum.FilterQuery) ([]types.Log, error) {
 	ec.chkFromTo(q.FromBlock.Uint64(), q.ToBlock.Uint64())
+	if ec.flErr != nil {
+		return nil, ec.flErr
+	}
 	var ret []types.Log
 	keep := ec.logs[:0] // share same backing array and capacity as ec.logs so can modify it in-place
 	for _, elog := range ec.logs {
diff --git a/eth/mon2/monitor.go b/eth/mon2/monitor.go
index 8e3dcf9..ecc499f 100644
--- a/eth/mon2/monitor.go
+++ b/eth/mon2/monitor.go
@@ -61,7 +61,10 @@ func (m *Monitor) MonAddr(cfg PerAddrCfg, cbfn EventCallback) {
 			}
 			q.ToBlock = toBigInt(toBlk)
 			// call m.ec.FilterLogs and skip already logs before savedLogID, if savedLogID is nil, return all received logs
-			todoLogs := m.doOneQuery(q, savedLogID)
+			todoLogs, err := m.doOneQuery(q, savedLogID)
+			if err != nil {
+				continue // keep same fromBlk and try again in next ticker, next to may be different
+			}
 			// now go over todoLogs and call callback func
 			// it's possible all have been skipped so we don't do anything
 			for _, elog := range todoLogs {
@@ -118,19 +121,20 @@ func (m *Monitor) initFromInQ(q *ethereum.FilterQuery, key string) *LogEventID {
 	return nil
 }
 
-// calls FilterLogs and skip already processed
-func (m *Monitor) doOneQuery(q *ethereum.FilterQuery, savedLogID *LogEventID) []types.Log {
+// calls FilterLogs and skip already processed log blk/idx, if FilterLogs returns non-nil err, return nil logs and err directly
+func (m *Monitor) doOneQuery(q *ethereum.FilterQuery, savedLogID *LogEventID) ([]types.Log, error) {
 	logs, err := m.ec.FilterLogs(context.TODO(), *q)
 	if err != nil {
-		log.Warnf("%d-%x getLogs fromBlk %s toBlk %s failed. err: %s", m.chainId, q.Addresses[0], q.FromBlock, q.ToBlock, err)
+		log.Warnf("%d-%x getLogs fromBlk %s toBlk %s failed. err: %v", m.chainId, q.Addresses[0], q.FromBlock, q.ToBlock, err)
+		return nil, err
 	}
 	if len(logs) == 0 {
-		return logs
+		return logs, nil
 	}
 	// if resume from db and on first ticker, as fromblock is same as db, we may get same events again
 	// how many logs should be skipped, only could be non-zero if savedLogID isn't nil
 	// if savedLogID is nil, return 0 directly
-	return logs[savedLogID.CountSkip(logs):]
+	return logs[savedLogID.CountSkip(logs):], nil
 }
 
 // parse abi and return map from event.ID to its name eg. Deposited
diff --git a/eth/mon2/types.go b/eth/mon2/types.go
index 1a86612..99186f9 100644
--- a/eth/mon2/types.go
+++ b/eth/mon2/types.go
@@ -143,7 +143,7 @@ func (m *Monitor) updateBlkNum() {
 	// don't lock upfront in case rpc takes long time
 	blkNum, err := m.ec.BlockNumber(context.Background())
 	if err != nil {
-		log.Warnf("chain %d get blknum err: %s", m.chainId, err)
+		log.Warnf("chain %d get blknum err: %v", m.chainId, err)
 		// todo: switch to backup ec
 		return
 	}