Skip to content

Commit

Permalink
Correct failing tests
Browse files Browse the repository at this point in the history
  • Loading branch information
rodion-lim-partior committed Sep 3, 2024
1 parent 182eebe commit 88a0c3b
Show file tree
Hide file tree
Showing 5 changed files with 221 additions and 95 deletions.
63 changes: 55 additions & 8 deletions internal/ethereum/blocklistener.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package ethereum
import (
"container/list"
"context"
"fmt"
"sync"
"time"

Expand All @@ -40,6 +41,12 @@ type blockUpdateConsumer struct {
updates chan<- *ffcapi.BlockHashEvent
}

type blockFilterStatus struct {
isEstablished bool
signal chan struct{}
mux sync.Mutex
}

// blockListener has two functions:
// 1) To establish and keep track of what the head block height of the blockchain is, so event streams know how far from the head they are
// 2) To feed new block information to any registered consumers
Expand All @@ -49,7 +56,7 @@ type blockListener struct {
backend rpcbackend.RPC
wsBackend rpcbackend.WebSocketRPCClient // if configured the getting the blockheight will not complete until WS connects, overrides backend once connected
listenLoopDone chan struct{}
blockFilterEstablished chan struct{}
blockFilterEstablished blockFilterStatus
initialBlockHeightObtained chan struct{}
newHeadsTap chan struct{}
newHeadsSub rpcbackend.Subscription
Expand All @@ -74,7 +81,7 @@ func newBlockListener(ctx context.Context, c *ethConnector, conf config.Section,
ctx: log.WithLogField(ctx, "role", "blocklistener"),
c: c,
backend: c.backend, // use the HTTP backend - might get overwritten by a connected websocket later
blockFilterEstablished: make(chan struct{}),
blockFilterEstablished: blockFilterStatus{isEstablished: false, signal: make(chan struct{})},
initialBlockHeightObtained: make(chan struct{}),
newHeadsTap: make(chan struct{}),
highestBlock: -1,
Expand All @@ -94,6 +101,34 @@ func newBlockListener(ctx context.Context, c *ethConnector, conf config.Section,
return bl, nil
}

// setting block filter status updates that new block filter has been created
func (bl *blockListener) setBlockFilterStatus() {
bl.blockFilterEstablished.mux.Lock()
defer bl.blockFilterEstablished.mux.Unlock()
if !bl.blockFilterEstablished.isEstablished {
bl.blockFilterEstablished.isEstablished = true
close(bl.blockFilterEstablished.signal)
}
}

// un-setting block filter status updates that no block filter is present on block listener
func (bl *blockListener) unsetBlockFilterStatus() {
bl.blockFilterEstablished.mux.Lock()
defer bl.blockFilterEstablished.mux.Unlock()
if bl.blockFilterEstablished.isEstablished {
bl.blockFilterEstablished.isEstablished = false
bl.blockFilterEstablished.signal = make(chan struct{})
}
}

func (bl *blockListener) blockTillBlockFilterEstablished(ctx context.Context) {
select {
case <-bl.blockFilterEstablished.signal:
case <-bl.ctx.Done():
case <-ctx.Done():
}
}

func (bl *blockListener) newHeadsSubListener() {
for range bl.newHeadsSub.Notifications() {
select {
Expand Down Expand Up @@ -156,6 +191,9 @@ func (bl *blockListener) establishBlockHeightWithRetry() error {

func (bl *blockListener) listenLoop() {
defer close(bl.listenLoopDone)
defer func() {
bl.unsetBlockFilterStatus()
}()

err := bl.establishBlockHeightWithRetry()
close(bl.initialBlockHeightObtained)
Expand Down Expand Up @@ -195,7 +233,7 @@ func (bl *blockListener) listenLoop() {
failCount++
continue
} else {
close(bl.blockFilterEstablished)
bl.setBlockFilterStatus()
}
}

Expand Down Expand Up @@ -481,27 +519,32 @@ func (bl *blockListener) dispatchToConsumers(consumers []*blockUpdateConsumer, u
}
}

func (bl *blockListener) checkStartedLocked() {
func (bl *blockListener) checkStartedLocked(ctx context.Context) {
if bl.listenLoopDone == nil {
bl.listenLoopDone = make(chan struct{})
go bl.listenLoop()
}
<-bl.blockFilterEstablished

bl.blockTillBlockFilterEstablished(ctx)
}

func (bl *blockListener) addConsumer(c *blockUpdateConsumer) {
bl.mux.Lock()
defer bl.mux.Unlock()
bl.checkStartedLocked()
bl.consumers[*c.id] = c
bl.mux.Unlock()
bl.checkStartedLocked(context.Background())
}

func (bl *blockListener) getHighestBlock(ctx context.Context) (int64, bool) {
bl.checkStartedLocked(ctx)
if err := ctx.Err(); err != nil {
return -1, false
}
bl.mux.Lock()
bl.checkStartedLocked()
highestBlock := bl.highestBlock
bl.mux.Unlock()
// if not yet initialized, wait to be initialized
fmt.Println("highest block", highestBlock)
if highestBlock < 0 {
select {
case <-bl.initialBlockHeightObtained:
Expand All @@ -512,6 +555,10 @@ func (bl *blockListener) getHighestBlock(ctx context.Context) (int64, bool) {
}
bl.mux.Lock()
highestBlock = bl.highestBlock
if bl.highestBlock == -1 {
// handle edge case when bl.initialBlockHeightObtained channel is closed
return -1, false
}
bl.mux.Unlock()
log.L(ctx).Debugf("ChainHead=%d", highestBlock)
return highestBlock, true
Expand Down
Loading

0 comments on commit 88a0c3b

Please sign in to comment.