diff --git a/core/blockchain.go b/core/blockchain.go index 13d99203b6..088dd5e447 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -1514,8 +1514,11 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types. } if status == CanonStatTy { - // Always send chain event here - bc.sendNewBlockEvent(block, receipts, true, true) + if bc.enableAdditionalChainEvent { + bc.sendNewBlockEvent(block, receipts, true, true) + } else { + bc.sendNewBlockEvent(block, receipts, false, false) + } if len(logs) > 0 { bc.logsFeed.Send(logs) } diff --git a/core/events.go b/core/events.go index e477da7e23..b438f53f96 100644 --- a/core/events.go +++ b/core/events.go @@ -44,6 +44,11 @@ type ChainEvent struct { FinalizedBlockHash common.Hash } +type Finalizer struct { + FinalizedBlockNumber uint64 `json:"finalizedBlockNumber"` + FinalizedBlockHash common.Hash `json:"finalizedBlockHash"` +} + type ChainSideEvent struct { Block *types.Block } diff --git a/eth/filters/api.go b/eth/filters/api.go index e0b07e318e..032d63896e 100644 --- a/eth/filters/api.go +++ b/eth/filters/api.go @@ -28,6 +28,7 @@ import ( "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/event" @@ -210,6 +211,35 @@ func (api *PublicFilterAPI) NewBlockFilter() rpc.ID { return headerSub.ID } +// NewFinalizedBlock send a notification each time a new block is marked as "Finalized" +func (api *PublicFilterAPI) NewFinalizedBlock(ctx context.Context) (*rpc.Subscription, error) { + notifier, supported := rpc.NotifierFromContext(ctx) + if !supported { + return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported + } + + rpcSub := notifier.CreateSubscription() + go func() { + finalizer := make(chan *core.Finalizer) + fSub := api.events.SubscribeNewFinalizedBlocks(finalizer) + + for { + select { + case f := <-finalizer: + notifier.Notify(rpcSub.ID, f) + case <-rpcSub.Err(): + fSub.Unsubscribe() + return + case <-notifier.Closed(): + fSub.Unsubscribe() + return + } + } + }() + + return rpcSub, nil +} + // NewHeads send a notification each time a new (header) block is appended to the chain. func (api *PublicFilterAPI) NewHeads(ctx context.Context) (*rpc.Subscription, error) { notifier, supported := rpc.NotifierFromContext(ctx) diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go index 12f037d0f9..669805fd43 100644 --- a/eth/filters/filter_system.go +++ b/eth/filters/filter_system.go @@ -52,6 +52,8 @@ const ( PendingTransactionsSubscription // BlocksSubscription queries hashes for blocks that are imported BlocksSubscription + // FinalizedBlockSubscription + FinalizedBlockSubscription // LastSubscription keeps track of the last index LastIndexSubscription ) @@ -69,15 +71,16 @@ const ( ) type subscription struct { - id rpc.ID - typ Type - created time.Time - logsCrit ethereum.FilterQuery - logs chan []*types.Log - hashes chan []common.Hash - headers chan *types.Header - installed chan struct{} // closed when the filter is installed - err chan error // closed when the filter is uninstalled + id rpc.ID + typ Type + created time.Time + logsCrit ethereum.FilterQuery + logs chan []*types.Log + hashes chan []common.Hash + headers chan *types.Header + finalizers chan *core.Finalizer + installed chan struct{} // closed when the filter is installed + err chan error // closed when the filter is uninstalled } // EventSystem creates subscriptions, processes events and broadcasts them to the @@ -227,15 +230,16 @@ func (es *EventSystem) SubscribeLogs(crit ethereum.FilterQuery, logs chan []*typ // pending logs that match the given criteria. func (es *EventSystem) subscribeMinedPendingLogs(crit ethereum.FilterQuery, logs chan []*types.Log) *Subscription { sub := &subscription{ - id: rpc.NewID(), - typ: MinedAndPendingLogsSubscription, - logsCrit: crit, - created: time.Now(), - logs: logs, - hashes: make(chan []common.Hash), - headers: make(chan *types.Header), - installed: make(chan struct{}), - err: make(chan error), + id: rpc.NewID(), + typ: MinedAndPendingLogsSubscription, + logsCrit: crit, + created: time.Now(), + logs: logs, + hashes: make(chan []common.Hash), + headers: make(chan *types.Header), + finalizers: make(chan *core.Finalizer), + installed: make(chan struct{}), + err: make(chan error), } return es.subscribe(sub) } @@ -244,15 +248,16 @@ func (es *EventSystem) subscribeMinedPendingLogs(crit ethereum.FilterQuery, logs // given criteria to the given logs channel. func (es *EventSystem) subscribeLogs(crit ethereum.FilterQuery, logs chan []*types.Log) *Subscription { sub := &subscription{ - id: rpc.NewID(), - typ: LogsSubscription, - logsCrit: crit, - created: time.Now(), - logs: logs, - hashes: make(chan []common.Hash), - headers: make(chan *types.Header), - installed: make(chan struct{}), - err: make(chan error), + id: rpc.NewID(), + typ: LogsSubscription, + logsCrit: crit, + created: time.Now(), + logs: logs, + hashes: make(chan []common.Hash), + headers: make(chan *types.Header), + finalizers: make(chan *core.Finalizer), + installed: make(chan struct{}), + err: make(chan error), } return es.subscribe(sub) } @@ -261,15 +266,33 @@ func (es *EventSystem) subscribeLogs(crit ethereum.FilterQuery, logs chan []*typ // transactions that enter the transaction pool. func (es *EventSystem) subscribePendingLogs(crit ethereum.FilterQuery, logs chan []*types.Log) *Subscription { sub := &subscription{ - id: rpc.NewID(), - typ: PendingLogsSubscription, - logsCrit: crit, - created: time.Now(), - logs: logs, - hashes: make(chan []common.Hash), - headers: make(chan *types.Header), - installed: make(chan struct{}), - err: make(chan error), + id: rpc.NewID(), + typ: PendingLogsSubscription, + logsCrit: crit, + created: time.Now(), + logs: logs, + hashes: make(chan []common.Hash), + headers: make(chan *types.Header), + finalizers: make(chan *core.Finalizer), + installed: make(chan struct{}), + err: make(chan error), + } + return es.subscribe(sub) +} + +// SubscribeNewFinalizedHeads creates a subscription that writes the block number of a block that is +// imported in the chain. +func (es *EventSystem) SubscribeNewFinalizedBlocks(finalizers chan *core.Finalizer) *Subscription { + sub := &subscription{ + id: rpc.NewID(), + typ: FinalizedBlockSubscription, + created: time.Now(), + logs: make(chan []*types.Log), + hashes: make(chan []common.Hash), + headers: make(chan *types.Header), + finalizers: finalizers, + installed: make(chan struct{}), + err: make(chan error), } return es.subscribe(sub) } @@ -278,14 +301,15 @@ func (es *EventSystem) subscribePendingLogs(crit ethereum.FilterQuery, logs chan // imported in the chain. func (es *EventSystem) SubscribeNewHeads(headers chan *types.Header) *Subscription { sub := &subscription{ - id: rpc.NewID(), - typ: BlocksSubscription, - created: time.Now(), - logs: make(chan []*types.Log), - hashes: make(chan []common.Hash), - headers: headers, - installed: make(chan struct{}), - err: make(chan error), + id: rpc.NewID(), + typ: BlocksSubscription, + created: time.Now(), + logs: make(chan []*types.Log), + hashes: make(chan []common.Hash), + headers: headers, + finalizers: make(chan *core.Finalizer), + installed: make(chan struct{}), + err: make(chan error), } return es.subscribe(sub) } @@ -294,14 +318,15 @@ func (es *EventSystem) SubscribeNewHeads(headers chan *types.Header) *Subscripti // transactions that enter the transaction pool. func (es *EventSystem) SubscribePendingTxs(hashes chan []common.Hash) *Subscription { sub := &subscription{ - id: rpc.NewID(), - typ: PendingTransactionsSubscription, - created: time.Now(), - logs: make(chan []*types.Log), - hashes: hashes, - headers: make(chan *types.Header), - installed: make(chan struct{}), - err: make(chan error), + id: rpc.NewID(), + typ: PendingTransactionsSubscription, + created: time.Now(), + logs: make(chan []*types.Log), + hashes: hashes, + headers: make(chan *types.Header), + finalizers: make(chan *core.Finalizer), + installed: make(chan struct{}), + err: make(chan error), } return es.subscribe(sub) } @@ -351,6 +376,15 @@ func (es *EventSystem) handleTxsEvent(filters filterIndex, ev core.NewTxsEvent) } } +func (es *EventSystem) handleFinalizedEvent(filters filterIndex, ev core.ChainEvent) { + for _, f := range filters[FinalizedBlockSubscription] { + f.finalizers <- &core.Finalizer{ + FinalizedBlockNumber: ev.FinalizedBlockNumber, + FinalizedBlockHash: ev.FinalizedBlockHash, + } + } +} + func (es *EventSystem) handleChainEvent(filters filterIndex, ev core.ChainEvent) { for _, f := range filters[BlocksSubscription] { f.headers <- ev.Block.Header() @@ -467,7 +501,7 @@ func (es *EventSystem) eventLoop() { es.handlePendingLogs(index, ev) case ev := <-es.chainCh: es.handleChainEvent(index, ev) - + es.handleFinalizedEvent(index, ev) case f := <-es.install: if f.typ == MinedAndPendingLogsSubscription { // the type are logs and pending logs subscriptions diff --git a/eth/filters/filter_system_test.go b/eth/filters/filter_system_test.go index 51c45871c3..117d0c462a 100644 --- a/eth/filters/filter_system_test.go +++ b/eth/filters/filter_system_test.go @@ -157,6 +157,56 @@ func (b *testBackend) ServiceFilter(ctx context.Context, session *bloombits.Matc }() } +func TestFinalizedBlockSubscription(t *testing.T) { + t.Parallel() + var ( + db = rawdb.NewMemoryDatabase() + backend = &testBackend{db: db} + api = NewPublicFilterAPI(backend, false, deadline) + genesis = (&core.Genesis{BaseFee: big.NewInt(params.InitialBaseFee)}).MustCommit(db) + chain, _ = core.GenerateChain(params.TestChainConfig, genesis, ethash.NewFaker(), db, 10, func(i int, gen *core.BlockGen) {}, true) + chainEvents = []core.ChainEvent{} + ) + for _, blk := range chain { + chainEvents = append(chainEvents, core.ChainEvent{Hash: blk.Hash(), Block: blk}) + } + + chan0 := make(chan *core.Finalizer) + sub0 := api.events.SubscribeNewFinalizedBlocks(chan0) + chan1 := make(chan *core.Finalizer) + sub1 := api.events.SubscribeNewFinalizedBlocks(chan1) + go func() { // simulate client + i1, i2 := 0, 0 + for i1 != len(chainEvents) || i2 != len(chainEvents) { + select { + case f := <-chan0: + if chainEvents[i1].FinalizedBlockNumber != f.FinalizedBlockNumber { + t.Errorf("sub0 received invalid hash on index %d, want %d, got %d", i1, chainEvents[i1].FinalizedBlockNumber, f.FinalizedBlockNumber) + } + t.Log("abcd") + i1++ + case f := <-chan1: + if chainEvents[i2].FinalizedBlockNumber != f.FinalizedBlockNumber { + t.Errorf("sub1 received invalid hash on index %d, want %d, got %d", i2, chainEvents[i2].FinalizedBlockNumber, f.FinalizedBlockNumber) + } + t.Log("efgh") + i2++ + } + } + + sub0.Unsubscribe() + sub1.Unsubscribe() + }() + + time.Sleep(1 * time.Second) + for _, e := range chainEvents { + backend.chainFeed.Send(e) + } + + <-sub0.Err() + <-sub1.Err() +} + // TestBlockSubscription tests if a block subscription returns block hashes for posted chain events. // It creates multiple subscriptions: // - one at the start and should receive all posted chain events and a second (blockHashes)