Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create block filters before poll interval #152

Merged
merged 13 commits into from
Sep 6, 2024
2 changes: 1 addition & 1 deletion cmd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func configCommand() *cobra.Command {
Use: "docs",
Short: "Prints the config info as markdown",
Long: "",
RunE: func(cmd *cobra.Command, args []string) error {
RunE: func(_ *cobra.Command, _ []string) error {
InitConfig()
b, err := config.GenerateConfigMarkdown(context.Background(), "", config.GetKnownKeys())
fmt.Println(string(b))
Expand Down
2 changes: 1 addition & 1 deletion cmd/evmconnect.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ var rootCmd = &cobra.Command{
Use: "evmconnect",
Short: "Hyperledger FireFly Connector for EVM based blockchains",
Long: ``,
RunE: func(cmd *cobra.Command, args []string) error {
RunE: func(_ *cobra.Command, _ []string) error {
return run()
},
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func versionCommand() *cobra.Command {
Use: "version",
Short: "Prints the version info",
Long: "",
RunE: func(cmd *cobra.Command, args []string) error {
RunE: func(_ *cobra.Command, _ []string) error {

info := &Info{
Version: BuildVersionOverride,
Expand Down
78 changes: 58 additions & 20 deletions internal/ethereum/blocklistener.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,15 @@ type blockUpdateConsumer struct {
// 1) To establish and keep track of what the head block height of the blockchain is, so event streams know how far from the head they are
// 2) To feed new block information to any registered consumers
type blockListener struct {
ctx context.Context
c *ethConnector
backend rpcbackend.RPC
wsBackend rpcbackend.WebSocketRPCClient // if configured the getting the blockheight will not complete until WS connects, overrides backend once connected
listenLoopDone chan struct{}
ctx context.Context
c *ethConnector
backend rpcbackend.RPC
wsBackend rpcbackend.WebSocketRPCClient // if configured the getting the blockheight will not complete until WS connects, overrides backend once connected
listenLoopDone chan struct{}

isStarted bool
startDone chan struct{}

initialBlockHeightObtained chan struct{}
newHeadsTap chan struct{}
newHeadsSub rpcbackend.Subscription
Expand All @@ -73,6 +77,8 @@ func newBlockListener(ctx context.Context, c *ethConnector, conf config.Section,
ctx: log.WithLogField(ctx, "role", "blocklistener"),
c: c,
backend: c.backend, // use the HTTP backend - might get overwritten by a connected websocket later
isStarted: false,
startDone: make(chan struct{}),
initialBlockHeightObtained: make(chan struct{}),
newHeadsTap: make(chan struct{}),
highestBlock: -1,
Expand All @@ -92,6 +98,22 @@ func newBlockListener(ctx context.Context, c *ethConnector, conf config.Section,
return bl, nil
}

// setting block filter status updates that new block filter has been created
func (bl *blockListener) markStarted() {
if !bl.isStarted {
bl.isStarted = true
close(bl.startDone)
}
}

func (bl *blockListener) waitUntilStarted(ctx context.Context) {
select {
case <-bl.startDone:
case <-bl.ctx.Done():
case <-ctx.Done():
Chengxuan marked this conversation as resolved.
Show resolved Hide resolved
}
}

func (bl *blockListener) newHeadsSubListener() {
for range bl.newHeadsSub.Notifications() {
select {
Expand All @@ -106,7 +128,7 @@ func (bl *blockListener) newHeadsSubListener() {
// getBlockHeightWithRetry keeps retrying attempting to get the initial block height until successful
func (bl *blockListener) establishBlockHeightWithRetry() error {
wsConnected := false
return bl.c.retry.Do(bl.ctx, "get initial block height", func(attempt int) (retry bool, err error) {
return bl.c.retry.Do(bl.ctx, "get initial block height", func(_ int) (retry bool, err error) {

// If we have a WebSocket backend, then we connect it and switch over to using it
// (we accept an un-locked update here to backend, as the most important routine that's
Expand Down Expand Up @@ -136,16 +158,18 @@ func (bl *blockListener) establishBlockHeightWithRetry() error {
bl.backend = bl.wsBackend
}

// Now get the block heiht
// Now get the block height
var hexBlockHeight ethtypes.HexInteger
rpcErr := bl.backend.CallRPC(bl.ctx, &hexBlockHeight, "eth_blockNumber")
if rpcErr != nil {
log.L(bl.ctx).Warnf("Block height could not be obtained: %s", rpcErr.Message)
return true, rpcErr.Error()
}

bl.mux.Lock()
bl.highestBlock = hexBlockHeight.BigInt().Int64()
bl.mux.Unlock()

return false, nil
})
}
Expand All @@ -162,6 +186,7 @@ func (bl *blockListener) listenLoop() {
var filter string
failCount := 0
gapPotential := true
firstIteration := true
for {
if failCount > 0 {
if bl.c.doFailureDelay(bl.ctx, failCount) {
Expand All @@ -170,12 +195,16 @@ func (bl *blockListener) listenLoop() {
}
} else {
// Sleep for the polling interval, or until we're shoulder tapped by the newHeads listener
select {
case <-time.After(bl.blockPollingInterval):
case <-bl.newHeadsTap:
case <-bl.ctx.Done():
log.L(bl.ctx).Debugf("Block listener loop stopping")
return
if !firstIteration {
select {
case <-time.After(bl.blockPollingInterval):
case <-bl.newHeadsTap:
case <-bl.ctx.Done():
log.L(bl.ctx).Debugf("Block listener loop stopping")
return
}
} else {
firstIteration = false
}
}

Expand All @@ -186,6 +215,7 @@ func (bl *blockListener) listenLoop() {
failCount++
continue
}
bl.markStarted()
}

var blockHashes []ethtypes.HexBytes0xPrefix
Expand Down Expand Up @@ -374,7 +404,7 @@ func (bl *blockListener) rebuildCanonicalChain() *list.Element {
for {
var bi *blockInfoJSONRPC
var reason ffcapi.ErrorReason
err := bl.c.retry.Do(bl.ctx, "rebuild listener canonical chain", func(attempt int) (retry bool, err error) {
err := bl.c.retry.Do(bl.ctx, "rebuild listener canonical chain", func(_ int) (retry bool, err error) {
bi, reason, err = bl.getBlockInfoByNumber(bl.ctx, nextBlockNumber, false, "")
return reason != ffcapi.ErrorReasonNotFound, err
})
Expand Down Expand Up @@ -428,7 +458,7 @@ func (bl *blockListener) trimToLastValidBlock() (lastValidBlock *minimalBlockInf
currentViewBlock := lastElem.Value.(*minimalBlockInfo)
var freshBlockInfo *blockInfoJSONRPC
var reason ffcapi.ErrorReason
err := bl.c.retry.Do(bl.ctx, "rebuild listener canonical chain", func(attempt int) (retry bool, err error) {
err := bl.c.retry.Do(bl.ctx, "rebuild listener canonical chain", func(_ int) (retry bool, err error) {
freshBlockInfo, reason, err = bl.getBlockInfoByNumber(bl.ctx, currentViewBlock.number, false, "")
return reason != ffcapi.ErrorReasonNotFound, err
})
Expand Down Expand Up @@ -471,23 +501,28 @@ func (bl *blockListener) dispatchToConsumers(consumers []*blockUpdateConsumer, u
}
}

func (bl *blockListener) checkStartedLocked() {
func (bl *blockListener) checkAndStartListenerLoop() {
bl.mux.Lock()
defer bl.mux.Unlock()
if bl.listenLoopDone == nil {
bl.listenLoopDone = make(chan struct{})
go bl.listenLoop()
}
}

func (bl *blockListener) addConsumer(c *blockUpdateConsumer) {
func (bl *blockListener) addConsumer(ctx context.Context, c *blockUpdateConsumer) {
bl.checkAndStartListenerLoop()
bl.waitUntilStarted(ctx) // need to make sure the listener is started before adding any consumers
bl.mux.Lock()
defer bl.mux.Unlock()
bl.checkStartedLocked()
bl.consumers[*c.id] = c
}

func (bl *blockListener) getHighestBlock(ctx context.Context) (int64, bool) {
bl.checkAndStartListenerLoop()
// block height will be established as the first step of listener startup process
// so we don't need to wait for the entire startup process to finish to return the result
bl.mux.Lock()
bl.checkStartedLocked()
highestBlock := bl.highestBlock
bl.mux.Unlock()
// if not yet initialized, wait to be initialized
Expand Down Expand Up @@ -515,6 +550,9 @@ func (bl *blockListener) waitClosed() {
bl.wsBackend.Close()
}
if listenLoopDone != nil {
<-listenLoopDone
select {
case <-listenLoopDone:
case <-bl.ctx.Done():
}
}
}
Loading
Loading