diff --git a/core/blockchain.go b/core/blockchain.go index 13d99203b6..088dd5e447 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -1514,8 +1514,11 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types. } if status == CanonStatTy { - // Always send chain event here - bc.sendNewBlockEvent(block, receipts, true, true) + if bc.enableAdditionalChainEvent { + bc.sendNewBlockEvent(block, receipts, true, true) + } else { + bc.sendNewBlockEvent(block, receipts, false, false) + } if len(logs) > 0 { bc.logsFeed.Send(logs) } diff --git a/core/events.go b/core/events.go index e477da7e23..d08a1b33fa 100644 --- a/core/events.go +++ b/core/events.go @@ -44,6 +44,11 @@ type ChainEvent struct { FinalizedBlockHash common.Hash } +type FinalizedBlockInfo struct { + FinalizedBlockNumber uint64 `json:"finalizedBlockNumber"` + FinalizedBlockHash common.Hash `json:"finalizedBlockHash"` +} + type ChainSideEvent struct { Block *types.Block } diff --git a/eth/filters/api.go b/eth/filters/api.go index e0b07e318e..f5c9985016 100644 --- a/eth/filters/api.go +++ b/eth/filters/api.go @@ -28,9 +28,11 @@ import ( "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/rpc" ) @@ -210,8 +212,40 @@ func (api *PublicFilterAPI) NewBlockFilter() rpc.ID { return headerSub.ID } +// NewFinalizedBlock send a notification each time a new block is marked as "Finalized" +func (api *PublicFilterAPI) NewFinalizedBlock(ctx context.Context) (*rpc.Subscription, error) { + log.Info("[Dien][NewFinalizedBlock]") + notifier, supported := rpc.NotifierFromContext(ctx) + if !supported { + return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported + } + + rpcSub := notifier.CreateSubscription() + go func() { + finalizer := make(chan *core.FinalizedBlockInfo) + fSub := api.events.SubscribeNewFinalizedBlocks(finalizer) + + for { + select { + case f := <-finalizer: + log.Info("[Dien][NewFinalizedBlock][Notify]") + notifier.Notify(rpcSub.ID, f) + case <-rpcSub.Err(): + fSub.Unsubscribe() + return + case <-notifier.Closed(): + fSub.Unsubscribe() + return + } + } + }() + + return rpcSub, nil +} + // NewHeads send a notification each time a new (header) block is appended to the chain. func (api *PublicFilterAPI) NewHeads(ctx context.Context) (*rpc.Subscription, error) { + log.Info("[Dien][NewHeads]") notifier, supported := rpc.NotifierFromContext(ctx) if !supported { return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported @@ -226,6 +260,7 @@ func (api *PublicFilterAPI) NewHeads(ctx context.Context) (*rpc.Subscription, er for { select { case h := <-headers: + log.Info("[Dien][NewHeads][Notify]") notifier.Notify(rpcSub.ID, h) case <-rpcSub.Err(): headersSub.Unsubscribe() diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go index 12f037d0f9..8515fce91d 100644 --- a/eth/filters/filter_system.go +++ b/eth/filters/filter_system.go @@ -52,6 +52,8 @@ const ( PendingTransactionsSubscription // BlocksSubscription queries hashes for blocks that are imported BlocksSubscription + // FinalizedBlockSubscription + FinalizedBlockSubscription // LastSubscription keeps track of the last index LastIndexSubscription ) @@ -69,23 +71,25 @@ const ( ) type subscription struct { - id rpc.ID - typ Type - created time.Time - logsCrit ethereum.FilterQuery - logs chan []*types.Log - hashes chan []common.Hash - headers chan *types.Header - installed chan struct{} // closed when the filter is installed - err chan error // closed when the filter is uninstalled + id rpc.ID + typ Type + created time.Time + logsCrit ethereum.FilterQuery + logs chan []*types.Log + hashes chan []common.Hash + headers chan *types.Header + finalizers chan *core.FinalizedBlockInfo + installed chan struct{} // closed when the filter is installed + err chan error // closed when the filter is uninstalled } // EventSystem creates subscriptions, processes events and broadcasts them to the // subscription which match the subscription criteria. type EventSystem struct { - backend Backend - lightMode bool - lastHead *types.Header + backend Backend + lightMode bool + lastHead *types.Header + lastFinalized uint64 // last finalized block number // Subscriptions txsSub event.Subscription // Subscription for new transaction event @@ -227,15 +231,16 @@ func (es *EventSystem) SubscribeLogs(crit ethereum.FilterQuery, logs chan []*typ // pending logs that match the given criteria. func (es *EventSystem) subscribeMinedPendingLogs(crit ethereum.FilterQuery, logs chan []*types.Log) *Subscription { sub := &subscription{ - id: rpc.NewID(), - typ: MinedAndPendingLogsSubscription, - logsCrit: crit, - created: time.Now(), - logs: logs, - hashes: make(chan []common.Hash), - headers: make(chan *types.Header), - installed: make(chan struct{}), - err: make(chan error), + id: rpc.NewID(), + typ: MinedAndPendingLogsSubscription, + logsCrit: crit, + created: time.Now(), + logs: logs, + hashes: make(chan []common.Hash), + headers: make(chan *types.Header), + finalizers: make(chan *core.FinalizedBlockInfo), + installed: make(chan struct{}), + err: make(chan error), } return es.subscribe(sub) } @@ -244,15 +249,16 @@ func (es *EventSystem) subscribeMinedPendingLogs(crit ethereum.FilterQuery, logs // given criteria to the given logs channel. func (es *EventSystem) subscribeLogs(crit ethereum.FilterQuery, logs chan []*types.Log) *Subscription { sub := &subscription{ - id: rpc.NewID(), - typ: LogsSubscription, - logsCrit: crit, - created: time.Now(), - logs: logs, - hashes: make(chan []common.Hash), - headers: make(chan *types.Header), - installed: make(chan struct{}), - err: make(chan error), + id: rpc.NewID(), + typ: LogsSubscription, + logsCrit: crit, + created: time.Now(), + logs: logs, + hashes: make(chan []common.Hash), + headers: make(chan *types.Header), + finalizers: make(chan *core.FinalizedBlockInfo), + installed: make(chan struct{}), + err: make(chan error), } return es.subscribe(sub) } @@ -261,15 +267,33 @@ func (es *EventSystem) subscribeLogs(crit ethereum.FilterQuery, logs chan []*typ // transactions that enter the transaction pool. func (es *EventSystem) subscribePendingLogs(crit ethereum.FilterQuery, logs chan []*types.Log) *Subscription { sub := &subscription{ - id: rpc.NewID(), - typ: PendingLogsSubscription, - logsCrit: crit, - created: time.Now(), - logs: logs, - hashes: make(chan []common.Hash), - headers: make(chan *types.Header), - installed: make(chan struct{}), - err: make(chan error), + id: rpc.NewID(), + typ: PendingLogsSubscription, + logsCrit: crit, + created: time.Now(), + logs: logs, + hashes: make(chan []common.Hash), + headers: make(chan *types.Header), + finalizers: make(chan *core.FinalizedBlockInfo), + installed: make(chan struct{}), + err: make(chan error), + } + return es.subscribe(sub) +} + +// SubscribeNewFinalizedHeads creates a subscription that writes the block number of a block that is +// imported in the chain. +func (es *EventSystem) SubscribeNewFinalizedBlocks(finalizers chan *core.FinalizedBlockInfo) *Subscription { + sub := &subscription{ + id: rpc.NewID(), + typ: FinalizedBlockSubscription, + created: time.Now(), + logs: make(chan []*types.Log), + hashes: make(chan []common.Hash), + headers: make(chan *types.Header), + finalizers: finalizers, + installed: make(chan struct{}), + err: make(chan error), } return es.subscribe(sub) } @@ -278,14 +302,15 @@ func (es *EventSystem) subscribePendingLogs(crit ethereum.FilterQuery, logs chan // imported in the chain. func (es *EventSystem) SubscribeNewHeads(headers chan *types.Header) *Subscription { sub := &subscription{ - id: rpc.NewID(), - typ: BlocksSubscription, - created: time.Now(), - logs: make(chan []*types.Log), - hashes: make(chan []common.Hash), - headers: headers, - installed: make(chan struct{}), - err: make(chan error), + id: rpc.NewID(), + typ: BlocksSubscription, + created: time.Now(), + logs: make(chan []*types.Log), + hashes: make(chan []common.Hash), + headers: headers, + finalizers: make(chan *core.FinalizedBlockInfo), + installed: make(chan struct{}), + err: make(chan error), } return es.subscribe(sub) } @@ -294,14 +319,15 @@ func (es *EventSystem) SubscribeNewHeads(headers chan *types.Header) *Subscripti // transactions that enter the transaction pool. func (es *EventSystem) SubscribePendingTxs(hashes chan []common.Hash) *Subscription { sub := &subscription{ - id: rpc.NewID(), - typ: PendingTransactionsSubscription, - created: time.Now(), - logs: make(chan []*types.Log), - hashes: hashes, - headers: make(chan *types.Header), - installed: make(chan struct{}), - err: make(chan error), + id: rpc.NewID(), + typ: PendingTransactionsSubscription, + created: time.Now(), + logs: make(chan []*types.Log), + hashes: hashes, + headers: make(chan *types.Header), + finalizers: make(chan *core.FinalizedBlockInfo), + installed: make(chan struct{}), + err: make(chan error), } return es.subscribe(sub) } @@ -342,6 +368,7 @@ func (es *EventSystem) handleRemovedLogs(filters filterIndex, ev core.RemovedLog } func (es *EventSystem) handleTxsEvent(filters filterIndex, ev core.NewTxsEvent) { + log.Info("[Dien][handleTxsEvent]") hashes := make([]common.Hash, 0, len(ev.Txs)) for _, tx := range ev.Txs { hashes = append(hashes, tx.Hash()) @@ -351,10 +378,28 @@ func (es *EventSystem) handleTxsEvent(filters filterIndex, ev core.NewTxsEvent) } } +func (es *EventSystem) handleFinalizedEvent(filters filterIndex, ev core.ChainEvent) { + log.Info("[Dien][handleFinalizedEvent][0]") + if ev.FinalizedBlockNumber == 0 || es.lastFinalized == ev.FinalizedBlockNumber { + return + } + es.lastFinalized = ev.FinalizedBlockNumber + log.Info("[Dien][handleFinalizedEvent][1]") + for _, f := range filters[FinalizedBlockSubscription] { + f.finalizers <- &core.FinalizedBlockInfo{ + FinalizedBlockNumber: ev.FinalizedBlockNumber, + FinalizedBlockHash: ev.FinalizedBlockHash, + } + } + log.Info("[Dien][handleFinalizedEvent][2]") +} + func (es *EventSystem) handleChainEvent(filters filterIndex, ev core.ChainEvent) { + log.Info("[Dien][handleChainEvent][0]") for _, f := range filters[BlocksSubscription] { f.headers <- ev.Block.Header() } + log.Info("[Dien][handleChainEvent][1]") if es.lightMode && len(filters[LogsSubscription]) > 0 { es.lightFilterNewHead(ev.Block.Header(), func(header *types.Header, remove bool) { for _, f := range filters[LogsSubscription] { @@ -466,8 +511,9 @@ func (es *EventSystem) eventLoop() { case ev := <-es.pendingLogsCh: es.handlePendingLogs(index, ev) case ev := <-es.chainCh: + log.Info("[Dien][enventLoop][chainCh]") es.handleChainEvent(index, ev) - + es.handleFinalizedEvent(index, ev) case f := <-es.install: if f.typ == MinedAndPendingLogsSubscription { // the type are logs and pending logs subscriptions diff --git a/eth/filters/filter_system_test.go b/eth/filters/filter_system_test.go index 51c45871c3..6051cc9f38 100644 --- a/eth/filters/filter_system_test.go +++ b/eth/filters/filter_system_test.go @@ -157,6 +157,61 @@ func (b *testBackend) ServiceFilter(ctx context.Context, session *bloombits.Matc }() } +func TestFinalizedBlockSubscription(t *testing.T) { + t.Parallel() + var ( + db = rawdb.NewMemoryDatabase() + backend = &testBackend{db: db} + api = NewPublicFilterAPI(backend, false, deadline) + genesis = (&core.Genesis{BaseFee: big.NewInt(params.InitialBaseFee)}).MustCommit(db) + chain, _ = core.GenerateChain(params.TestChainConfig, genesis, ethash.NewFaker(), db, 10, func(i int, gen *core.BlockGen) {}, true) + chainEvents = []core.ChainEvent{} + ) + for _, blk := range chain { + chainEvents = append(chainEvents, core.ChainEvent{ + Hash: blk.Hash(), + Block: blk, + FinalizedBlockNumber: rand.Uint64(), + FinalizedBlockHash: common.BigToHash(new(big.Int)), + }) + } + + chan0 := make(chan *core.FinalizedBlockInfo) + sub0 := api.events.SubscribeNewFinalizedBlocks(chan0) + chan1 := make(chan *core.FinalizedBlockInfo) + sub1 := api.events.SubscribeNewFinalizedBlocks(chan1) + go func() { // simulate client + i1, i2 := 0, 0 + for i1 != len(chainEvents) || i2 != len(chainEvents) { + select { + case f := <-chan0: + if chainEvents[i1].FinalizedBlockNumber != f.FinalizedBlockNumber { + t.Errorf("sub0 received invalid finalized block number on index %d, want %d, got %d", + i1, chainEvents[i1].FinalizedBlockNumber, f.FinalizedBlockNumber) + } + i1++ + case f := <-chan1: + if chainEvents[i2].FinalizedBlockNumber != f.FinalizedBlockNumber { + t.Errorf("sub1 received invalid finalized block number on index %d, want %d, got %d", + i2, chainEvents[i2].FinalizedBlockNumber, f.FinalizedBlockNumber) + } + i2++ + } + } + + sub0.Unsubscribe() + sub1.Unsubscribe() + }() + + time.Sleep(1 * time.Second) + for _, e := range chainEvents { + backend.chainFeed.Send(e) + } + + <-sub0.Err() + <-sub1.Err() +} + // TestBlockSubscription tests if a block subscription returns block hashes for posted chain events. // It creates multiple subscriptions: // - one at the start and should receive all posted chain events and a second (blockHashes)