Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

eth/filters: add finalized block subscription #375

Merged
merged 1 commit into from
Nov 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1514,8 +1514,11 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
}

if status == CanonStatTy {
// Always send chain event here
bc.sendNewBlockEvent(block, receipts, true, true)
if bc.enableAdditionalChainEvent {
bc.sendNewBlockEvent(block, receipts, true, true)
} else {
bc.sendNewBlockEvent(block, receipts, false, false)
}
if len(logs) > 0 {
bc.logsFeed.Send(logs)
}
Expand Down
6 changes: 6 additions & 0 deletions core/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package core

import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types"
)

Expand All @@ -44,6 +45,11 @@ type ChainEvent struct {
FinalizedBlockHash common.Hash
}

type FinalizedBlockInfo struct {
FinalizedBlockNumber hexutil.Uint64 `json:"finalizedBlockNumber"`
FinalizedBlockHash common.Hash `json:"finalizedBlockHash"`
}

type ChainSideEvent struct {
Block *types.Block
}
Expand Down
30 changes: 30 additions & 0 deletions eth/filters/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
Expand Down Expand Up @@ -210,6 +211,35 @@ func (api *PublicFilterAPI) NewBlockFilter() rpc.ID {
return headerSub.ID
}

// NewFinalizedBlock send a notification each time a new block is marked as "Finalized"
func (api *PublicFilterAPI) NewFinalizedBlocks(ctx context.Context) (*rpc.Subscription, error) {
notifier, supported := rpc.NotifierFromContext(ctx)
if !supported {
return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
}

rpcSub := notifier.CreateSubscription()
go func() {
finalizer := make(chan *core.FinalizedBlockInfo)
fSub := api.events.SubscribeNewFinalizedBlocks(finalizer)

for {
select {
case f := <-finalizer:
notifier.Notify(rpcSub.ID, f)
case <-rpcSub.Err():
fSub.Unsubscribe()
return
case <-notifier.Closed():
fSub.Unsubscribe()
return
}
}
}()

return rpcSub, nil
}

// NewHeads send a notification each time a new (header) block is appended to the chain.
func (api *PublicFilterAPI) NewHeads(ctx context.Context) (*rpc.Subscription, error) {
notifier, supported := rpc.NotifierFromContext(ctx)
Expand Down
152 changes: 96 additions & 56 deletions eth/filters/filter_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/types"
Expand All @@ -52,6 +53,8 @@ const (
PendingTransactionsSubscription
// BlocksSubscription queries hashes for blocks that are imported
BlocksSubscription
// FinalizedBlockSubscription
FinalizedBlockSubscription
// LastSubscription keeps track of the last index
LastIndexSubscription
)
Expand All @@ -69,23 +72,25 @@ const (
)

type subscription struct {
id rpc.ID
typ Type
created time.Time
logsCrit ethereum.FilterQuery
logs chan []*types.Log
hashes chan []common.Hash
headers chan *types.Header
installed chan struct{} // closed when the filter is installed
err chan error // closed when the filter is uninstalled
id rpc.ID
typ Type
created time.Time
logsCrit ethereum.FilterQuery
logs chan []*types.Log
hashes chan []common.Hash
headers chan *types.Header
finalizers chan *core.FinalizedBlockInfo
installed chan struct{} // closed when the filter is installed
err chan error // closed when the filter is uninstalled
}

// EventSystem creates subscriptions, processes events and broadcasts them to the
// subscription which match the subscription criteria.
type EventSystem struct {
backend Backend
lightMode bool
lastHead *types.Header
backend Backend
lightMode bool
lastHead *types.Header
lastFinalized uint64 // last finalized block number

// Subscriptions
txsSub event.Subscription // Subscription for new transaction event
Expand Down Expand Up @@ -227,15 +232,16 @@ func (es *EventSystem) SubscribeLogs(crit ethereum.FilterQuery, logs chan []*typ
// pending logs that match the given criteria.
func (es *EventSystem) subscribeMinedPendingLogs(crit ethereum.FilterQuery, logs chan []*types.Log) *Subscription {
sub := &subscription{
id: rpc.NewID(),
typ: MinedAndPendingLogsSubscription,
logsCrit: crit,
created: time.Now(),
logs: logs,
hashes: make(chan []common.Hash),
headers: make(chan *types.Header),
installed: make(chan struct{}),
err: make(chan error),
id: rpc.NewID(),
typ: MinedAndPendingLogsSubscription,
logsCrit: crit,
created: time.Now(),
logs: logs,
hashes: make(chan []common.Hash),
headers: make(chan *types.Header),
finalizers: make(chan *core.FinalizedBlockInfo),
installed: make(chan struct{}),
err: make(chan error),
}
return es.subscribe(sub)
}
Expand All @@ -244,15 +250,16 @@ func (es *EventSystem) subscribeMinedPendingLogs(crit ethereum.FilterQuery, logs
// given criteria to the given logs channel.
func (es *EventSystem) subscribeLogs(crit ethereum.FilterQuery, logs chan []*types.Log) *Subscription {
sub := &subscription{
id: rpc.NewID(),
typ: LogsSubscription,
logsCrit: crit,
created: time.Now(),
logs: logs,
hashes: make(chan []common.Hash),
headers: make(chan *types.Header),
installed: make(chan struct{}),
err: make(chan error),
id: rpc.NewID(),
typ: LogsSubscription,
logsCrit: crit,
created: time.Now(),
logs: logs,
hashes: make(chan []common.Hash),
headers: make(chan *types.Header),
finalizers: make(chan *core.FinalizedBlockInfo),
installed: make(chan struct{}),
err: make(chan error),
}
return es.subscribe(sub)
}
Expand All @@ -261,15 +268,33 @@ func (es *EventSystem) subscribeLogs(crit ethereum.FilterQuery, logs chan []*typ
// transactions that enter the transaction pool.
func (es *EventSystem) subscribePendingLogs(crit ethereum.FilterQuery, logs chan []*types.Log) *Subscription {
sub := &subscription{
id: rpc.NewID(),
typ: PendingLogsSubscription,
logsCrit: crit,
created: time.Now(),
logs: logs,
hashes: make(chan []common.Hash),
headers: make(chan *types.Header),
installed: make(chan struct{}),
err: make(chan error),
id: rpc.NewID(),
typ: PendingLogsSubscription,
logsCrit: crit,
created: time.Now(),
logs: logs,
hashes: make(chan []common.Hash),
headers: make(chan *types.Header),
finalizers: make(chan *core.FinalizedBlockInfo),
installed: make(chan struct{}),
err: make(chan error),
}
return es.subscribe(sub)
}

// SubscribeNewFinalizedHeads creates a subscription that writes the block number of a block that is
// imported in the chain.
func (es *EventSystem) SubscribeNewFinalizedBlocks(finalizers chan *core.FinalizedBlockInfo) *Subscription {
sub := &subscription{
id: rpc.NewID(),
typ: FinalizedBlockSubscription,
created: time.Now(),
logs: make(chan []*types.Log),
hashes: make(chan []common.Hash),
headers: make(chan *types.Header),
finalizers: finalizers,
installed: make(chan struct{}),
err: make(chan error),
}
return es.subscribe(sub)
}
Expand All @@ -278,14 +303,15 @@ func (es *EventSystem) subscribePendingLogs(crit ethereum.FilterQuery, logs chan
// imported in the chain.
func (es *EventSystem) SubscribeNewHeads(headers chan *types.Header) *Subscription {
sub := &subscription{
id: rpc.NewID(),
typ: BlocksSubscription,
created: time.Now(),
logs: make(chan []*types.Log),
hashes: make(chan []common.Hash),
headers: headers,
installed: make(chan struct{}),
err: make(chan error),
id: rpc.NewID(),
typ: BlocksSubscription,
created: time.Now(),
logs: make(chan []*types.Log),
hashes: make(chan []common.Hash),
headers: headers,
finalizers: make(chan *core.FinalizedBlockInfo),
installed: make(chan struct{}),
err: make(chan error),
}
return es.subscribe(sub)
}
Expand All @@ -294,14 +320,15 @@ func (es *EventSystem) SubscribeNewHeads(headers chan *types.Header) *Subscripti
// transactions that enter the transaction pool.
func (es *EventSystem) SubscribePendingTxs(hashes chan []common.Hash) *Subscription {
sub := &subscription{
id: rpc.NewID(),
typ: PendingTransactionsSubscription,
created: time.Now(),
logs: make(chan []*types.Log),
hashes: hashes,
headers: make(chan *types.Header),
installed: make(chan struct{}),
err: make(chan error),
id: rpc.NewID(),
typ: PendingTransactionsSubscription,
created: time.Now(),
logs: make(chan []*types.Log),
hashes: hashes,
headers: make(chan *types.Header),
finalizers: make(chan *core.FinalizedBlockInfo),
installed: make(chan struct{}),
err: make(chan error),
}
return es.subscribe(sub)
}
Expand Down Expand Up @@ -351,6 +378,19 @@ func (es *EventSystem) handleTxsEvent(filters filterIndex, ev core.NewTxsEvent)
}
}

func (es *EventSystem) handleFinalizedEvent(filters filterIndex, ev core.ChainEvent) {
minh-bq marked this conversation as resolved.
Show resolved Hide resolved
if ev.FinalizedBlockNumber == 0 || es.lastFinalized == ev.FinalizedBlockNumber {
return
}
es.lastFinalized = ev.FinalizedBlockNumber
for _, f := range filters[FinalizedBlockSubscription] {
andicrypt marked this conversation as resolved.
Show resolved Hide resolved
f.finalizers <- &core.FinalizedBlockInfo{
FinalizedBlockNumber: hexutil.Uint64(ev.FinalizedBlockNumber),
FinalizedBlockHash: ev.FinalizedBlockHash,
}
}
}

func (es *EventSystem) handleChainEvent(filters filterIndex, ev core.ChainEvent) {
for _, f := range filters[BlocksSubscription] {
f.headers <- ev.Block.Header()
Expand Down Expand Up @@ -467,7 +507,7 @@ func (es *EventSystem) eventLoop() {
es.handlePendingLogs(index, ev)
case ev := <-es.chainCh:
es.handleChainEvent(index, ev)

es.handleFinalizedEvent(index, ev)
case f := <-es.install:
if f.typ == MinedAndPendingLogsSubscription {
// the type are logs and pending logs subscriptions
Expand Down
56 changes: 56 additions & 0 deletions eth/filters/filter_system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/consensus/ethash"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/bloombits"
Expand Down Expand Up @@ -157,6 +158,61 @@ func (b *testBackend) ServiceFilter(ctx context.Context, session *bloombits.Matc
}()
}

func TestFinalizedBlockSubscription(t *testing.T) {
t.Parallel()
var (
db = rawdb.NewMemoryDatabase()
backend = &testBackend{db: db}
api = NewPublicFilterAPI(backend, false, deadline)
genesis = (&core.Genesis{BaseFee: big.NewInt(params.InitialBaseFee)}).MustCommit(db)
chain, _ = core.GenerateChain(params.TestChainConfig, genesis, ethash.NewFaker(), db, 10, func(i int, gen *core.BlockGen) {}, true)
chainEvents = []core.ChainEvent{}
)
for _, blk := range chain {
chainEvents = append(chainEvents, core.ChainEvent{
Hash: blk.Hash(),
Block: blk,
FinalizedBlockNumber: rand.Uint64(),
FinalizedBlockHash: common.BigToHash(new(big.Int)),
})
}

chan0 := make(chan *core.FinalizedBlockInfo)
sub0 := api.events.SubscribeNewFinalizedBlocks(chan0)
chan1 := make(chan *core.FinalizedBlockInfo)
sub1 := api.events.SubscribeNewFinalizedBlocks(chan1)
go func() { // simulate client
i1, i2 := 0, 0
for i1 != len(chainEvents) || i2 != len(chainEvents) {
select {
case f := <-chan0:
if hexutil.Uint64(chainEvents[i1].FinalizedBlockNumber) != f.FinalizedBlockNumber {
t.Errorf("sub0 received invalid finalized block number on index %d, want %d, got %d",
i1, chainEvents[i1].FinalizedBlockNumber, f.FinalizedBlockNumber)
}
i1++
case f := <-chan1:
if hexutil.Uint64(chainEvents[i2].FinalizedBlockNumber) != f.FinalizedBlockNumber {
t.Errorf("sub1 received invalid finalized block number on index %d, want %d, got %d",
i2, chainEvents[i2].FinalizedBlockNumber, f.FinalizedBlockNumber)
}
i2++
}
}

sub0.Unsubscribe()
sub1.Unsubscribe()
}()

time.Sleep(1 * time.Second)
for _, e := range chainEvents {
backend.chainFeed.Send(e)
}

<-sub0.Err()
<-sub1.Err()
}

// TestBlockSubscription tests if a block subscription returns block hashes for posted chain events.
// It creates multiple subscriptions:
// - one at the start and should receive all posted chain events and a second (blockHashes)
Expand Down