Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create block filters before poll interval #152

Merged
merged 13 commits into from
Sep 6, 2024
79 changes: 68 additions & 11 deletions internal/ethereum/blocklistener.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ type blockUpdateConsumer struct {
updates chan<- *ffcapi.BlockHashEvent
}

type blockFilterStatus struct {
isEstablished bool
signal chan struct{}
mux sync.Mutex
rodion-lim-partior marked this conversation as resolved.
Show resolved Hide resolved
}

// blockListener has two functions:
// 1) To establish and keep track of what the head block height of the blockchain is, so event streams know how far from the head they are
// 2) To feed new block information to any registered consumers
Expand All @@ -49,6 +55,7 @@ type blockListener struct {
backend rpcbackend.RPC
wsBackend rpcbackend.WebSocketRPCClient // if configured the getting the blockheight will not complete until WS connects, overrides backend once connected
listenLoopDone chan struct{}
blockFilterEstablished blockFilterStatus
initialBlockHeightObtained chan struct{}
newHeadsTap chan struct{}
newHeadsSub rpcbackend.Subscription
Expand All @@ -73,6 +80,7 @@ func newBlockListener(ctx context.Context, c *ethConnector, conf config.Section,
ctx: log.WithLogField(ctx, "role", "blocklistener"),
c: c,
backend: c.backend, // use the HTTP backend - might get overwritten by a connected websocket later
blockFilterEstablished: blockFilterStatus{isEstablished: false, signal: make(chan struct{})},
initialBlockHeightObtained: make(chan struct{}),
newHeadsTap: make(chan struct{}),
highestBlock: -1,
Expand All @@ -92,6 +100,34 @@ func newBlockListener(ctx context.Context, c *ethConnector, conf config.Section,
return bl, nil
}

// setting block filter status updates that new block filter has been created
func (bl *blockListener) setBlockFilterStatus() {
bl.blockFilterEstablished.mux.Lock()
defer bl.blockFilterEstablished.mux.Unlock()
if !bl.blockFilterEstablished.isEstablished {
bl.blockFilterEstablished.isEstablished = true
close(bl.blockFilterEstablished.signal)
}
}

// un-setting block filter status updates that no block filter is present on block listener
func (bl *blockListener) unsetBlockFilterStatus() {
bl.blockFilterEstablished.mux.Lock()
defer bl.blockFilterEstablished.mux.Unlock()
if bl.blockFilterEstablished.isEstablished {
bl.blockFilterEstablished.isEstablished = false
bl.blockFilterEstablished.signal = make(chan struct{})
}
}

func (bl *blockListener) blockTillBlockFilterEstablished(ctx context.Context) {
Chengxuan marked this conversation as resolved.
Show resolved Hide resolved
select {
case <-bl.blockFilterEstablished.signal:
case <-bl.ctx.Done():
case <-ctx.Done():
Chengxuan marked this conversation as resolved.
Show resolved Hide resolved
}
}

func (bl *blockListener) newHeadsSubListener() {
for range bl.newHeadsSub.Notifications() {
select {
Expand Down Expand Up @@ -136,22 +172,27 @@ func (bl *blockListener) establishBlockHeightWithRetry() error {
bl.backend = bl.wsBackend
}

// Now get the block heiht
// Now get the block height
var hexBlockHeight ethtypes.HexInteger
rpcErr := bl.backend.CallRPC(bl.ctx, &hexBlockHeight, "eth_blockNumber")
if rpcErr != nil {
log.L(bl.ctx).Warnf("Block height could not be obtained: %s", rpcErr.Message)
return true, rpcErr.Error()
}

bl.mux.Lock()
bl.highestBlock = hexBlockHeight.BigInt().Int64()
bl.mux.Unlock()

return false, nil
})
}

func (bl *blockListener) listenLoop() {
defer close(bl.listenLoopDone)
defer func() {
bl.unsetBlockFilterStatus()
}()
rodion-lim-partior marked this conversation as resolved.
Show resolved Hide resolved

err := bl.establishBlockHeightWithRetry()
close(bl.initialBlockHeightObtained)
Expand All @@ -162,6 +203,7 @@ func (bl *blockListener) listenLoop() {
var filter string
failCount := 0
gapPotential := true
firstIteration := true
for {
if failCount > 0 {
if bl.c.doFailureDelay(bl.ctx, failCount) {
Expand All @@ -170,12 +212,16 @@ func (bl *blockListener) listenLoop() {
}
} else {
// Sleep for the polling interval, or until we're shoulder tapped by the newHeads listener
select {
case <-time.After(bl.blockPollingInterval):
case <-bl.newHeadsTap:
case <-bl.ctx.Done():
log.L(bl.ctx).Debugf("Block listener loop stopping")
return
if !firstIteration {
select {
case <-time.After(bl.blockPollingInterval):
case <-bl.newHeadsTap:
case <-bl.ctx.Done():
log.L(bl.ctx).Debugf("Block listener loop stopping")
return
}
} else {
firstIteration = false
}
}

Expand All @@ -185,6 +231,8 @@ func (bl *blockListener) listenLoop() {
log.L(bl.ctx).Errorf("Failed to establish new block filter: %s", err.Message)
failCount++
continue
} else {
rodion-lim-partior marked this conversation as resolved.
Show resolved Hide resolved
bl.setBlockFilterStatus()
}
}

Expand Down Expand Up @@ -470,23 +518,28 @@ func (bl *blockListener) dispatchToConsumers(consumers []*blockUpdateConsumer, u
}
}

func (bl *blockListener) checkStartedLocked() {
func (bl *blockListener) checkStartedLocked(ctx context.Context) {
if bl.listenLoopDone == nil {
bl.listenLoopDone = make(chan struct{})
go bl.listenLoop()
}

bl.blockTillBlockFilterEstablished(ctx)
Chengxuan marked this conversation as resolved.
Show resolved Hide resolved
}

func (bl *blockListener) addConsumer(c *blockUpdateConsumer) {
bl.mux.Lock()
defer bl.mux.Unlock()
bl.checkStartedLocked()
bl.consumers[*c.id] = c
bl.mux.Unlock()
bl.checkStartedLocked(context.Background())
rodion-lim-partior marked this conversation as resolved.
Show resolved Hide resolved
}

func (bl *blockListener) getHighestBlock(ctx context.Context) (int64, bool) {
bl.checkStartedLocked(ctx)
if err := ctx.Err(); err != nil {
return -1, false
}
bl.mux.Lock()
bl.checkStartedLocked()
highestBlock := bl.highestBlock
bl.mux.Unlock()
// if not yet initialized, wait to be initialized
Expand All @@ -500,6 +553,10 @@ func (bl *blockListener) getHighestBlock(ctx context.Context) (int64, bool) {
}
bl.mux.Lock()
highestBlock = bl.highestBlock
if bl.highestBlock == -1 {
// handle edge case when bl.initialBlockHeightObtained channel is closed
return -1, false
}
bl.mux.Unlock()
log.L(ctx).Debugf("ChainHead=%d", highestBlock)
return highestBlock, true
Expand Down
Loading