Skip to content

Commit

Permalink
add delay param for fast forward (#37)
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaozhou authored Sep 17, 2021
1 parent 1ece7ed commit f5c905f
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 18 deletions.
14 changes: 10 additions & 4 deletions eth/monitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ type Config struct {
Reset bool
CheckInterval uint64
BlockDelay uint64 // if zero, use service.blockDelay by default
ForwardDelay uint64 // if zero, use BlockDelay
}

// Event is the metadata for an event
Expand All @@ -87,6 +88,7 @@ type Event struct {
StartBlock *big.Int
EndBlock *big.Int
BlockDelay uint64
ForwardDelay uint64
CheckInterval uint64
Callback func(CallbackID, types.Log) bool
watch *watcher.Watch
Expand Down Expand Up @@ -212,7 +214,7 @@ func (s *Service) createEventWatch(
if e.CheckInterval == 0 {
e.CheckInterval = defaultCheckInterval
}
return s.watch.NewWatch(e.WatchName, q, e.BlockDelay, e.CheckInterval, reset)
return s.watch.NewWatch(e.WatchName, q, e.BlockDelay, e.ForwardDelay, e.CheckInterval, reset)
}

func (s *Service) Monitor(cfg *Config, callback func(CallbackID, types.Log) bool) (CallbackID, error) {
Expand All @@ -229,12 +231,16 @@ func (s *Service) Monitor(cfg *Config, callback func(CallbackID, types.Log) bool
WatchName: watchName,
StartBlock: cfg.StartBlock,
EndBlock: cfg.EndBlock,
BlockDelay: s.blockDelay,
BlockDelay: cfg.BlockDelay,
ForwardDelay: cfg.ForwardDelay,
CheckInterval: cfg.CheckInterval,
Callback: callback,
}
if cfg.BlockDelay != 0 {
eventToListen.BlockDelay = cfg.BlockDelay
if eventToListen.BlockDelay == 0 {
eventToListen.BlockDelay = s.blockDelay
}
if eventToListen.ForwardDelay == 0 {
eventToListen.ForwardDelay = eventToListen.BlockDelay
}
if eventToListen.CheckInterval == 0 {
eventToListen.CheckInterval = defaultCheckInterval
Expand Down
9 changes: 5 additions & 4 deletions eth/watcher/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ type Watch struct {
ackID LogEventID // ID of log event pending an ACK
lastID *LogEventID // ID of log event for resuming (or nil)
blkDelay uint64 // Log event delay in number of blocks
fwdDelay uint64 // Block delay when fast forward
checkInterval uint64 // Check event every checkInterval * service.polling
fromBlock uint64 // Start a fetch from this block number
query ethereum.FilterQuery // On-chain event log query
Expand Down Expand Up @@ -251,7 +252,7 @@ func (ws *WatchService) MakeFilterQuery(
// If "reset" is enabled, the watcher ignores the previously stored
// position in the subscription which resets the stream to its start.
func (ws *WatchService) NewWatch(
name string, query ethereum.FilterQuery, blkDelay, checkInterval uint64, reset bool) (*Watch, error) {
name string, query ethereum.FilterQuery, blkDelay, fwdDelay, checkInterval uint64, reset bool) (*Watch, error) {

if name == "" {
return nil, fmt.Errorf("watch name not specified")
Expand All @@ -261,6 +262,7 @@ func (ws *WatchService) NewWatch(
name: name,
service: ws,
blkDelay: blkDelay,
fwdDelay: fwdDelay,
checkInterval: checkInterval,
query: query,
logQueue: list.New(),
Expand Down Expand Up @@ -429,9 +431,8 @@ func (w *Watch) fetchLogEvents() {
log.Tracef("added %d logs to queue: %s: next from %d", count, w.name, w.fromBlock)
} else {
// we didn't find any event between fromBlock and toBlock, so we can fast forward.
// add additional block delay to mitigate consistency issues from query nodes,
// may add another parameter later
fromBlock := toBlock - w.blkDelay
// add additional block delay to mitigate consistency issues from query nodes
fromBlock := toBlock - w.fwdDelay
if fromBlock > w.fromBlock {
w.fromBlock = fromBlock
}
Expand Down
20 changes: 10 additions & 10 deletions eth/watcher/watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ func TestWatcher(t *testing.T) {

query := ethereum.FilterQuery{}

w, err := ws.NewWatch("foo", query, 2, 1, false)
w, err := ws.NewWatch("foo", query, 2, 2, 1, false)
if err != nil {
t.Fatalf("Cannot create watcher: %v", err)
}
Expand All @@ -266,7 +266,7 @@ func TestWatcher(t *testing.T) {
w.Close()
// test noLog case
client.noLog = true
w2, err := ws.NewWatch("foo", query, 2, 1, true) // reset fromBlock to 0
w2, err := ws.NewWatch("foo", query, 2, 2, 1, true) // reset fromBlock to 0
if w2.fromBlock != 0 {
t.Error("fromBlock isn't 0")
}
Expand Down Expand Up @@ -312,18 +312,18 @@ func TestBadWatcher(t *testing.T) {
FromBlock: big.NewInt(3),
}

w, err := ws.NewWatch("", query, 0, 1, false)
w, err := ws.NewWatch("", query, 0, 0, 1, false)
if err == nil {
w.Close()
t.Errorf("Watcher did not error on empty name")
}

w, err = ws.NewWatch("foo", query, 0, 1, false)
w, err = ws.NewWatch("foo", query, 0, 0, 1, false)
if err != nil {
t.Errorf("Cannot create watcher: %v", err)
}

w2, err := ws.NewWatch("foo", query, 0, 1, false)
w2, err := ws.NewWatch("foo", query, 0, 0, 1, false)
if err == nil {
w2.Close()
t.Errorf("Duplicate watcher did not fail")
Expand Down Expand Up @@ -367,7 +367,7 @@ func TestBadWatcher(t *testing.T) {
// A valid NewWatch() should fail after the watch service is closed.
ws.Close()

w, err = ws.NewWatch("bar", query, 0, 1, false)
w, err = ws.NewWatch("bar", query, 0, 0, 1, false)
if err == nil {
t.Errorf("Created watcher after service closed")
w.Close()
Expand All @@ -386,7 +386,7 @@ func TestWatcherRestart(t *testing.T) {
ws := makeWatchService(client, dal, uint64(polling), 0)

query := ethereum.FilterQuery{}
w, err := ws.NewWatch("foo", query, 2, 1, false)
w, err := ws.NewWatch("foo", query, 2, 2, 1, false)
if err != nil {
t.Fatalf("Cannot create watcher: %v", err)
}
Expand All @@ -403,7 +403,7 @@ func TestWatcherRestart(t *testing.T) {
client = NewFakeClient(blkSleep, true)
ws = makeWatchService(client, dal, uint64(polling), 0)

w, err = ws.NewWatch("foo", query, 2, 1, false)
w, err = ws.NewWatch("foo", query, 2, 2, 1, false)
if err != nil {
t.Fatalf("Cannot create watcher: %v", err)
}
Expand All @@ -419,7 +419,7 @@ func TestWatcherRestart(t *testing.T) {
client = NewFakeClient(blkSleep, true)
ws = makeWatchService(client, dal, uint64(polling), 0)

w, err = ws.NewWatch("foo", query, 2, 1, true) // reset subscription
w, err = ws.NewWatch("foo", query, 2, 2, 1, true) // reset subscription
if err != nil {
t.Fatalf("Cannot create watcher: %v", err)
}
Expand All @@ -441,7 +441,7 @@ func TestWatcherServiceClose(t *testing.T) {
query := ethereum.FilterQuery{
BlockHash: &hash,
}
w, _ := ws.NewWatch("foo", query, 2, 1, false)
w, _ := ws.NewWatch("foo", query, 2, 2, 1, false)

go func() {
// Block reading the next event log.
Expand Down

0 comments on commit f5c905f

Please sign in to comment.