Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create block filters before poll interval #152

Merged
merged 13 commits into from
Sep 6, 2024
26 changes: 19 additions & 7 deletions internal/ethereum/blocklistener.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
backend rpcbackend.RPC
wsBackend rpcbackend.WebSocketRPCClient // if configured the getting the blockheight will not complete until WS connects, overrides backend once connected
listenLoopDone chan struct{}
blockFilterEstablished chan struct{}
initialBlockHeightObtained chan struct{}
newHeadsTap chan struct{}
newHeadsSub rpcbackend.Subscription
Expand All @@ -73,6 +74,7 @@
ctx: log.WithLogField(ctx, "role", "blocklistener"),
c: c,
backend: c.backend, // use the HTTP backend - might get overwritten by a connected websocket later
blockFilterEstablished: make(chan struct{}),
initialBlockHeightObtained: make(chan struct{}),
newHeadsTap: make(chan struct{}),
highestBlock: -1,
Expand Down Expand Up @@ -136,16 +138,18 @@
bl.backend = bl.wsBackend
}

// Now get the block heiht
// Now get the block height
var hexBlockHeight ethtypes.HexInteger
rpcErr := bl.backend.CallRPC(bl.ctx, &hexBlockHeight, "eth_blockNumber")
if rpcErr != nil {
log.L(bl.ctx).Warnf("Block height could not be obtained: %s", rpcErr.Message)
return true, rpcErr.Error()
}

bl.mux.Lock()
bl.highestBlock = hexBlockHeight.BigInt().Int64()
bl.mux.Unlock()

return false, nil
})
}
Expand All @@ -162,6 +166,7 @@
var filter string
failCount := 0
gapPotential := true
firstIteration := true
for {
if failCount > 0 {
if bl.c.doFailureDelay(bl.ctx, failCount) {
Expand All @@ -170,12 +175,16 @@
}
} else {
// Sleep for the polling interval, or until we're shoulder tapped by the newHeads listener
select {
case <-time.After(bl.blockPollingInterval):
case <-bl.newHeadsTap:
case <-bl.ctx.Done():
log.L(bl.ctx).Debugf("Block listener loop stopping")
return
if !firstIteration {
select {
case <-time.After(bl.blockPollingInterval):
case <-bl.newHeadsTap:
case <-bl.ctx.Done():
log.L(bl.ctx).Debugf("Block listener loop stopping")
return
}
} else {
firstIteration = false
}
}

Expand All @@ -185,6 +194,8 @@
log.L(bl.ctx).Errorf("Failed to establish new block filter: %s", err.Message)
failCount++
continue
} else {

Check failure on line 197 in internal/ethereum/blocklistener.go

View workflow job for this annotation

GitHub Actions / build

superfluous-else: if block ends with a continue statement, so drop this else and outdent its block (revive)
rodion-lim-partior marked this conversation as resolved.
Show resolved Hide resolved
close(bl.blockFilterEstablished)
rodion-lim-partior marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand Down Expand Up @@ -475,6 +486,7 @@
bl.listenLoopDone = make(chan struct{})
go bl.listenLoop()
}
<-bl.blockFilterEstablished
}

func (bl *blockListener) addConsumer(c *blockUpdateConsumer) {
Expand Down
Loading