From 061ab32b45ec3428242bf8108a248fde5767d774 Mon Sep 17 00:00:00 2001 From: "Tran A. Dien" <57678065+andicrypt@users.noreply.github.com> Date: Thu, 2 Nov 2023 11:16:56 +0700 Subject: [PATCH] eth/filters: add finalized block subscription (#375) Allow users to subscribe to constantly receive notifications about latest finalized blocks Message: {"jsonrpc": "2.0","method": "eth_subscribe","params": ["newFinalizedBlocks"],"id": 1} Response: { "jsonrpc": "2.0", "method": "eth_subscription", "params": { "subscription": "0x4c5cd521106240a12aea37bb57ef666f", "result": { "finalizedBlockNumber": "0x14b4228", "finalizedBlockHash": "0x2dea4504e505f0743d443e6db2f0d270831d33f2917d484cd2f878e44839f416" } } } Note: Remember turning flag additionalchainevent.enable on --- core/blockchain.go | 7 +- core/events.go | 6 ++ eth/filters/api.go | 30 ++++++ eth/filters/filter_system.go | 152 +++++++++++++++++++----------- eth/filters/filter_system_test.go | 56 +++++++++++ 5 files changed, 193 insertions(+), 58 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index 13d99203b6..088dd5e447 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -1514,8 +1514,11 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types. } if status == CanonStatTy { - // Always send chain event here - bc.sendNewBlockEvent(block, receipts, true, true) + if bc.enableAdditionalChainEvent { + bc.sendNewBlockEvent(block, receipts, true, true) + } else { + bc.sendNewBlockEvent(block, receipts, false, false) + } if len(logs) > 0 { bc.logsFeed.Send(logs) } diff --git a/core/events.go b/core/events.go index e477da7e23..e8937c328f 100644 --- a/core/events.go +++ b/core/events.go @@ -18,6 +18,7 @@ package core import ( "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/core/types" ) @@ -44,6 +45,11 @@ type ChainEvent struct { FinalizedBlockHash common.Hash } +type FinalizedBlockInfo struct { + FinalizedBlockNumber hexutil.Uint64 `json:"finalizedBlockNumber"` + FinalizedBlockHash common.Hash `json:"finalizedBlockHash"` +} + type ChainSideEvent struct { Block *types.Block } diff --git a/eth/filters/api.go b/eth/filters/api.go index e0b07e318e..18c68e5b98 100644 --- a/eth/filters/api.go +++ b/eth/filters/api.go @@ -28,6 +28,7 @@ import ( "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/event" @@ -210,6 +211,35 @@ func (api *PublicFilterAPI) NewBlockFilter() rpc.ID { return headerSub.ID } +// NewFinalizedBlock send a notification each time a new block is marked as "Finalized" +func (api *PublicFilterAPI) NewFinalizedBlocks(ctx context.Context) (*rpc.Subscription, error) { + notifier, supported := rpc.NotifierFromContext(ctx) + if !supported { + return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported + } + + rpcSub := notifier.CreateSubscription() + go func() { + finalizer := make(chan *core.FinalizedBlockInfo) + fSub := api.events.SubscribeNewFinalizedBlocks(finalizer) + + for { + select { + case f := <-finalizer: + notifier.Notify(rpcSub.ID, f) + case <-rpcSub.Err(): + fSub.Unsubscribe() + return + case <-notifier.Closed(): + fSub.Unsubscribe() + return + } + } + }() + + return rpcSub, nil +} + // NewHeads send a notification each time a new (header) block is appended to the chain. func (api *PublicFilterAPI) NewHeads(ctx context.Context) (*rpc.Subscription, error) { notifier, supported := rpc.NotifierFromContext(ctx) diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go index 12f037d0f9..9017cbd856 100644 --- a/eth/filters/filter_system.go +++ b/eth/filters/filter_system.go @@ -26,6 +26,7 @@ import ( "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/types" @@ -52,6 +53,8 @@ const ( PendingTransactionsSubscription // BlocksSubscription queries hashes for blocks that are imported BlocksSubscription + // FinalizedBlockSubscription + FinalizedBlockSubscription // LastSubscription keeps track of the last index LastIndexSubscription ) @@ -69,23 +72,25 @@ const ( ) type subscription struct { - id rpc.ID - typ Type - created time.Time - logsCrit ethereum.FilterQuery - logs chan []*types.Log - hashes chan []common.Hash - headers chan *types.Header - installed chan struct{} // closed when the filter is installed - err chan error // closed when the filter is uninstalled + id rpc.ID + typ Type + created time.Time + logsCrit ethereum.FilterQuery + logs chan []*types.Log + hashes chan []common.Hash + headers chan *types.Header + finalizers chan *core.FinalizedBlockInfo + installed chan struct{} // closed when the filter is installed + err chan error // closed when the filter is uninstalled } // EventSystem creates subscriptions, processes events and broadcasts them to the // subscription which match the subscription criteria. type EventSystem struct { - backend Backend - lightMode bool - lastHead *types.Header + backend Backend + lightMode bool + lastHead *types.Header + lastFinalized uint64 // last finalized block number // Subscriptions txsSub event.Subscription // Subscription for new transaction event @@ -227,15 +232,16 @@ func (es *EventSystem) SubscribeLogs(crit ethereum.FilterQuery, logs chan []*typ // pending logs that match the given criteria. func (es *EventSystem) subscribeMinedPendingLogs(crit ethereum.FilterQuery, logs chan []*types.Log) *Subscription { sub := &subscription{ - id: rpc.NewID(), - typ: MinedAndPendingLogsSubscription, - logsCrit: crit, - created: time.Now(), - logs: logs, - hashes: make(chan []common.Hash), - headers: make(chan *types.Header), - installed: make(chan struct{}), - err: make(chan error), + id: rpc.NewID(), + typ: MinedAndPendingLogsSubscription, + logsCrit: crit, + created: time.Now(), + logs: logs, + hashes: make(chan []common.Hash), + headers: make(chan *types.Header), + finalizers: make(chan *core.FinalizedBlockInfo), + installed: make(chan struct{}), + err: make(chan error), } return es.subscribe(sub) } @@ -244,15 +250,16 @@ func (es *EventSystem) subscribeMinedPendingLogs(crit ethereum.FilterQuery, logs // given criteria to the given logs channel. func (es *EventSystem) subscribeLogs(crit ethereum.FilterQuery, logs chan []*types.Log) *Subscription { sub := &subscription{ - id: rpc.NewID(), - typ: LogsSubscription, - logsCrit: crit, - created: time.Now(), - logs: logs, - hashes: make(chan []common.Hash), - headers: make(chan *types.Header), - installed: make(chan struct{}), - err: make(chan error), + id: rpc.NewID(), + typ: LogsSubscription, + logsCrit: crit, + created: time.Now(), + logs: logs, + hashes: make(chan []common.Hash), + headers: make(chan *types.Header), + finalizers: make(chan *core.FinalizedBlockInfo), + installed: make(chan struct{}), + err: make(chan error), } return es.subscribe(sub) } @@ -261,15 +268,33 @@ func (es *EventSystem) subscribeLogs(crit ethereum.FilterQuery, logs chan []*typ // transactions that enter the transaction pool. func (es *EventSystem) subscribePendingLogs(crit ethereum.FilterQuery, logs chan []*types.Log) *Subscription { sub := &subscription{ - id: rpc.NewID(), - typ: PendingLogsSubscription, - logsCrit: crit, - created: time.Now(), - logs: logs, - hashes: make(chan []common.Hash), - headers: make(chan *types.Header), - installed: make(chan struct{}), - err: make(chan error), + id: rpc.NewID(), + typ: PendingLogsSubscription, + logsCrit: crit, + created: time.Now(), + logs: logs, + hashes: make(chan []common.Hash), + headers: make(chan *types.Header), + finalizers: make(chan *core.FinalizedBlockInfo), + installed: make(chan struct{}), + err: make(chan error), + } + return es.subscribe(sub) +} + +// SubscribeNewFinalizedHeads creates a subscription that writes the block number of a block that is +// imported in the chain. +func (es *EventSystem) SubscribeNewFinalizedBlocks(finalizers chan *core.FinalizedBlockInfo) *Subscription { + sub := &subscription{ + id: rpc.NewID(), + typ: FinalizedBlockSubscription, + created: time.Now(), + logs: make(chan []*types.Log), + hashes: make(chan []common.Hash), + headers: make(chan *types.Header), + finalizers: finalizers, + installed: make(chan struct{}), + err: make(chan error), } return es.subscribe(sub) } @@ -278,14 +303,15 @@ func (es *EventSystem) subscribePendingLogs(crit ethereum.FilterQuery, logs chan // imported in the chain. func (es *EventSystem) SubscribeNewHeads(headers chan *types.Header) *Subscription { sub := &subscription{ - id: rpc.NewID(), - typ: BlocksSubscription, - created: time.Now(), - logs: make(chan []*types.Log), - hashes: make(chan []common.Hash), - headers: headers, - installed: make(chan struct{}), - err: make(chan error), + id: rpc.NewID(), + typ: BlocksSubscription, + created: time.Now(), + logs: make(chan []*types.Log), + hashes: make(chan []common.Hash), + headers: headers, + finalizers: make(chan *core.FinalizedBlockInfo), + installed: make(chan struct{}), + err: make(chan error), } return es.subscribe(sub) } @@ -294,14 +320,15 @@ func (es *EventSystem) SubscribeNewHeads(headers chan *types.Header) *Subscripti // transactions that enter the transaction pool. func (es *EventSystem) SubscribePendingTxs(hashes chan []common.Hash) *Subscription { sub := &subscription{ - id: rpc.NewID(), - typ: PendingTransactionsSubscription, - created: time.Now(), - logs: make(chan []*types.Log), - hashes: hashes, - headers: make(chan *types.Header), - installed: make(chan struct{}), - err: make(chan error), + id: rpc.NewID(), + typ: PendingTransactionsSubscription, + created: time.Now(), + logs: make(chan []*types.Log), + hashes: hashes, + headers: make(chan *types.Header), + finalizers: make(chan *core.FinalizedBlockInfo), + installed: make(chan struct{}), + err: make(chan error), } return es.subscribe(sub) } @@ -351,6 +378,19 @@ func (es *EventSystem) handleTxsEvent(filters filterIndex, ev core.NewTxsEvent) } } +func (es *EventSystem) handleFinalizedEvent(filters filterIndex, ev core.ChainEvent) { + if ev.FinalizedBlockNumber == 0 || es.lastFinalized == ev.FinalizedBlockNumber { + return + } + es.lastFinalized = ev.FinalizedBlockNumber + for _, f := range filters[FinalizedBlockSubscription] { + f.finalizers <- &core.FinalizedBlockInfo{ + FinalizedBlockNumber: hexutil.Uint64(ev.FinalizedBlockNumber), + FinalizedBlockHash: ev.FinalizedBlockHash, + } + } +} + func (es *EventSystem) handleChainEvent(filters filterIndex, ev core.ChainEvent) { for _, f := range filters[BlocksSubscription] { f.headers <- ev.Block.Header() @@ -467,7 +507,7 @@ func (es *EventSystem) eventLoop() { es.handlePendingLogs(index, ev) case ev := <-es.chainCh: es.handleChainEvent(index, ev) - + es.handleFinalizedEvent(index, ev) case f := <-es.install: if f.typ == MinedAndPendingLogsSubscription { // the type are logs and pending logs subscriptions diff --git a/eth/filters/filter_system_test.go b/eth/filters/filter_system_test.go index 51c45871c3..d30c9f9592 100644 --- a/eth/filters/filter_system_test.go +++ b/eth/filters/filter_system_test.go @@ -28,6 +28,7 @@ import ( "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/consensus/ethash" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/bloombits" @@ -157,6 +158,61 @@ func (b *testBackend) ServiceFilter(ctx context.Context, session *bloombits.Matc }() } +func TestFinalizedBlockSubscription(t *testing.T) { + t.Parallel() + var ( + db = rawdb.NewMemoryDatabase() + backend = &testBackend{db: db} + api = NewPublicFilterAPI(backend, false, deadline) + genesis = (&core.Genesis{BaseFee: big.NewInt(params.InitialBaseFee)}).MustCommit(db) + chain, _ = core.GenerateChain(params.TestChainConfig, genesis, ethash.NewFaker(), db, 10, func(i int, gen *core.BlockGen) {}, true) + chainEvents = []core.ChainEvent{} + ) + for _, blk := range chain { + chainEvents = append(chainEvents, core.ChainEvent{ + Hash: blk.Hash(), + Block: blk, + FinalizedBlockNumber: rand.Uint64(), + FinalizedBlockHash: common.BigToHash(new(big.Int)), + }) + } + + chan0 := make(chan *core.FinalizedBlockInfo) + sub0 := api.events.SubscribeNewFinalizedBlocks(chan0) + chan1 := make(chan *core.FinalizedBlockInfo) + sub1 := api.events.SubscribeNewFinalizedBlocks(chan1) + go func() { // simulate client + i1, i2 := 0, 0 + for i1 != len(chainEvents) || i2 != len(chainEvents) { + select { + case f := <-chan0: + if hexutil.Uint64(chainEvents[i1].FinalizedBlockNumber) != f.FinalizedBlockNumber { + t.Errorf("sub0 received invalid finalized block number on index %d, want %d, got %d", + i1, chainEvents[i1].FinalizedBlockNumber, f.FinalizedBlockNumber) + } + i1++ + case f := <-chan1: + if hexutil.Uint64(chainEvents[i2].FinalizedBlockNumber) != f.FinalizedBlockNumber { + t.Errorf("sub1 received invalid finalized block number on index %d, want %d, got %d", + i2, chainEvents[i2].FinalizedBlockNumber, f.FinalizedBlockNumber) + } + i2++ + } + } + + sub0.Unsubscribe() + sub1.Unsubscribe() + }() + + time.Sleep(1 * time.Second) + for _, e := range chainEvents { + backend.chainFeed.Send(e) + } + + <-sub0.Err() + <-sub1.Err() +} + // TestBlockSubscription tests if a block subscription returns block hashes for posted chain events. // It creates multiple subscriptions: // - one at the start and should receive all posted chain events and a second (blockHashes)