Skip to content

Commit

Permalink
Protect misconfiguring catchupModeBlockGap < catchupModePageSize and …
Browse files Browse the repository at this point in the history
…fix catchup (#199)

* Protect users for misconfiguring catchupModeBlockGap < catchupModePageSize

Signed-off-by: Peter Broadhurst <[email protected]>

* Redress fix for writing HWM checkpoints in catchup mode

Signed-off-by: Peter Broadhurst <[email protected]>
  • Loading branch information
peterbroadhurst authored Feb 14, 2022
1 parent f6c7fe5 commit f67b840
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 14 deletions.
13 changes: 6 additions & 7 deletions internal/events/eventstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,6 @@ func (a *eventStream) eventPoller() {

ctx := auth.NewSystemAuthContext()
var checkpoint map[string]*big.Int
blockUpdatedFilterStale := false
for !a.suspendOrStop() {
var err error
// Load the checkpoint (should only be first time round)
Expand All @@ -428,14 +427,12 @@ func (a *eventStream) eventPoller() {
blockHeight, exists := checkpoint[sub.info.ID]
if !exists || blockHeight.Cmp(big.NewInt(0)) <= 0 {
blockHeight, err = sub.setInitialBlockHeight(ctx)
} else {
} else if !sub.inCatchupMode() {
sub.setCheckpointBlockHeight(blockHeight)
}
if err == nil {
err = sub.restartFilter(ctx, blockHeight)
}
blockUpdatedFilterStale = true
log.Debugf("%s: Checkpoint updated due to stale sub filter: %s", sub.info.ID, blockHeight.String())
}
if err == nil {
err = sub.processNewEvents(ctx)
Expand All @@ -453,15 +450,17 @@ func (a *eventStream) eventPoller() {
i1 := checkpoint[sub.info.ID]
i2 := sub.blockHWM()

changed = changed || blockUpdatedFilterStale || i1 == nil || i1.Cmp(&i2) != 0
subChanged := i1 == nil || i1.Cmp(&i2) != 0
if subChanged {
log.Debugf("%s: New checkpoint HWM: %s", a.spec.ID, i2.String())
}
changed = changed || subChanged
checkpoint[sub.info.ID] = new(big.Int).Set(&i2)
}
if changed {
if err = a.sm.storeCheckpoint(a.spec.ID, checkpoint); err != nil {
log.Errorf("%s: Failed to store checkpoint: %s", a.spec.ID, err)
}
// set this to false before the next evaluation for stale subscriptions
blockUpdatedFilterStale = false
}
}
// the event poller reacts to notification about a stream update, else it starts
Expand Down
9 changes: 5 additions & 4 deletions internal/events/logprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,13 @@ type logProcessor struct {
}

func newLogProcessor(subID string, event *ethbinding.ABIEvent, stream *eventStream) *logProcessor {
return &logProcessor{
lp := &logProcessor{
subID: subID,
event: event,
stream: stream,
}
lp.highestDispatched.SetInt64(-1)
return lp
}

func (lp *logProcessor) batchComplete(newestEvent *eventData) {
Expand All @@ -96,9 +98,8 @@ func (lp *logProcessor) getBlockHWM() big.Int {
func (lp *logProcessor) markNoEvents(blockNumber *big.Int) {
lp.hwnSync.Lock()
if lp.highestDispatched.Cmp(&lp.blockHWM) < 0 {
// Nothing in-flight, its safe to update the HWM
lp.blockHWM.Set(blockNumber)
log.Debugf("%s: HWM: %s", lp.subID, lp.blockHWM.String())
// Nothing in-flight, its safe to update the HWM - to one after the block we're up to
lp.blockHWM.Set(new(big.Int).Add(blockNumber, big.NewInt(1)))
}
lp.hwnSync.Unlock()
}
Expand Down
6 changes: 5 additions & 1 deletion internal/events/submanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,10 @@ func NewSubscriptionManager(conf *SubscriptionManagerConf, rpc eth.RPCClient, cr
if conf.CatchupModePageSize <= 0 {
conf.CatchupModePageSize = defaultCatchupModePageSize
}
if conf.CatchupModeBlockGap < conf.CatchupModePageSize {
log.Warnf("catchupModeBlockGap=%d must be >= catchupModePageSize=%d - setting to %d", conf.CatchupModeBlockGap, conf.CatchupModePageSize, conf.CatchupModePageSize)
conf.CatchupModeBlockGap = conf.CatchupModePageSize
}
return sm
}

Expand Down Expand Up @@ -410,7 +414,7 @@ func (s *subscriptionMGR) loadCheckpoint(streamID string) (map[string]*big.Int,
func (s *subscriptionMGR) storeCheckpoint(streamID string, checkpoint map[string]*big.Int) error {
cpID := checkpointIDPrefix + streamID
b, _ := json.MarshalIndent(&checkpoint, "", " ")
log.Debugf("Storing checkpoint %s: %s", cpID, string(b))
log.Tracef("Storing checkpoint %s: %s", cpID, string(b))
return s.db.Put(cpID, b)
}

Expand Down
11 changes: 11 additions & 0 deletions internal/events/submanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,17 @@ func newTestSubscriptionManager() *subscriptionMGR {
return sm
}

func TestNestSubscriptionManagerBlockGapValidation(t *testing.T) {
smconf := &SubscriptionManagerConf{
CatchupModeBlockGap: 10,
CatchupModePageSize: 1000,
}
rpc := &ethmocks.RPCClient{}
cr := &contractregistrymocks.ContractStore{}
sm := NewSubscriptionManager(smconf, rpc, cr, newMockWebSocket()).(*subscriptionMGR)
assert.Equal(t, int64(1000), sm.conf.CatchupModeBlockGap)
}

func TestCobraInitSubscriptionManager(t *testing.T) {
assert := assert.New(t)
cmd := cobra.Command{}
Expand Down
8 changes: 6 additions & 2 deletions internal/events/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ func (s *subscription) restartFilter(ctx context.Context, checkpoint *big.Int) e
defer cancel()

since := checkpoint
if s.catchupBlock != nil {
if s.inCatchupMode() {
// If we're already in catchup mode, we need to look at the current catchupBlock,
// not the checkpoint.
since = s.catchupBlock
Expand All @@ -238,7 +238,7 @@ func (s *subscription) restartFilter(ctx context.Context, checkpoint *big.Int) e
}

blockGap := new(big.Int).Sub(blockNumber.ToInt(), since).Int64()
log.Debugf("%s: restarting. Head=%s Position=%s Gap=%d (catchup threshold: %d)", s.logName, blockNumber.ToInt().String(), since.String(), blockGap, s.catchupModeBlockGap)
log.Debugf("%s: new filter. Head=%s Position=%s Gap=%d (catchup threshold: %d)", s.logName, blockNumber.ToInt().String(), since.String(), blockGap, s.catchupModeBlockGap)
if s.catchupModeBlockGap > 0 && blockGap > s.catchupModeBlockGap {
s.catchupBlock = since // note if we were already in catchup, this does not change anything
return nil
Expand All @@ -247,6 +247,10 @@ func (s *subscription) restartFilter(ctx context.Context, checkpoint *big.Int) e
return s.createFilter(ctx, since)
}

func (s *subscription) inCatchupMode() bool {
return s.catchupBlock != nil
}

// getEventTimestamp adds the block timestamp to the log entry.
// It uses a lru cache (blocknumber, timestamp) in the eventstream to determine the timestamp
// and falls back to querying the node if we don't have timestamp in the cache (at which point it gets
Expand Down

0 comments on commit f67b840

Please sign in to comment.