Skip to content

Commit

Permalink
eth/filters: add finalized block subscription (#375)
Browse files Browse the repository at this point in the history
Allow users to subscribe to constantly receive notifications about latest finalized blocks

Message:
{"jsonrpc": "2.0","method": "eth_subscribe","params": ["newFinalizedBlocks"],"id": 1}

Response:
{
    "jsonrpc": "2.0",
    "method": "eth_subscription",
    "params": {
        "subscription": "0x4c5cd521106240a12aea37bb57ef666f",
        "result": {
            "finalizedBlockNumber": "0x14b4228",
            "finalizedBlockHash": "0x2dea4504e505f0743d443e6db2f0d270831d33f2917d484cd2f878e44839f416"
        }
    }
}
Note: Remember turning flag additionalchainevent.enable on
  • Loading branch information
andicrypt authored Nov 2, 2023
1 parent b27867e commit 061ab32
Show file tree
Hide file tree
Showing 5 changed files with 193 additions and 58 deletions.
7 changes: 5 additions & 2 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1514,8 +1514,11 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
}

if status == CanonStatTy {
// Always send chain event here
bc.sendNewBlockEvent(block, receipts, true, true)
if bc.enableAdditionalChainEvent {
bc.sendNewBlockEvent(block, receipts, true, true)
} else {
bc.sendNewBlockEvent(block, receipts, false, false)
}
if len(logs) > 0 {
bc.logsFeed.Send(logs)
}
Expand Down
6 changes: 6 additions & 0 deletions core/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package core

import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types"
)

Expand All @@ -44,6 +45,11 @@ type ChainEvent struct {
FinalizedBlockHash common.Hash
}

type FinalizedBlockInfo struct {
FinalizedBlockNumber hexutil.Uint64 `json:"finalizedBlockNumber"`
FinalizedBlockHash common.Hash `json:"finalizedBlockHash"`
}

type ChainSideEvent struct {
Block *types.Block
}
Expand Down
30 changes: 30 additions & 0 deletions eth/filters/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
Expand Down Expand Up @@ -210,6 +211,35 @@ func (api *PublicFilterAPI) NewBlockFilter() rpc.ID {
return headerSub.ID
}

// NewFinalizedBlock send a notification each time a new block is marked as "Finalized"
func (api *PublicFilterAPI) NewFinalizedBlocks(ctx context.Context) (*rpc.Subscription, error) {
notifier, supported := rpc.NotifierFromContext(ctx)
if !supported {
return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
}

rpcSub := notifier.CreateSubscription()
go func() {
finalizer := make(chan *core.FinalizedBlockInfo)
fSub := api.events.SubscribeNewFinalizedBlocks(finalizer)

for {
select {
case f := <-finalizer:
notifier.Notify(rpcSub.ID, f)
case <-rpcSub.Err():
fSub.Unsubscribe()
return
case <-notifier.Closed():
fSub.Unsubscribe()
return
}
}
}()

return rpcSub, nil
}

// NewHeads send a notification each time a new (header) block is appended to the chain.
func (api *PublicFilterAPI) NewHeads(ctx context.Context) (*rpc.Subscription, error) {
notifier, supported := rpc.NotifierFromContext(ctx)
Expand Down
152 changes: 96 additions & 56 deletions eth/filters/filter_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/types"
Expand All @@ -52,6 +53,8 @@ const (
PendingTransactionsSubscription
// BlocksSubscription queries hashes for blocks that are imported
BlocksSubscription
// FinalizedBlockSubscription
FinalizedBlockSubscription
// LastSubscription keeps track of the last index
LastIndexSubscription
)
Expand All @@ -69,23 +72,25 @@ const (
)

type subscription struct {
id rpc.ID
typ Type
created time.Time
logsCrit ethereum.FilterQuery
logs chan []*types.Log
hashes chan []common.Hash
headers chan *types.Header
installed chan struct{} // closed when the filter is installed
err chan error // closed when the filter is uninstalled
id rpc.ID
typ Type
created time.Time
logsCrit ethereum.FilterQuery
logs chan []*types.Log
hashes chan []common.Hash
headers chan *types.Header
finalizers chan *core.FinalizedBlockInfo
installed chan struct{} // closed when the filter is installed
err chan error // closed when the filter is uninstalled
}

// EventSystem creates subscriptions, processes events and broadcasts them to the
// subscription which match the subscription criteria.
type EventSystem struct {
backend Backend
lightMode bool
lastHead *types.Header
backend Backend
lightMode bool
lastHead *types.Header
lastFinalized uint64 // last finalized block number

// Subscriptions
txsSub event.Subscription // Subscription for new transaction event
Expand Down Expand Up @@ -227,15 +232,16 @@ func (es *EventSystem) SubscribeLogs(crit ethereum.FilterQuery, logs chan []*typ
// pending logs that match the given criteria.
func (es *EventSystem) subscribeMinedPendingLogs(crit ethereum.FilterQuery, logs chan []*types.Log) *Subscription {
sub := &subscription{
id: rpc.NewID(),
typ: MinedAndPendingLogsSubscription,
logsCrit: crit,
created: time.Now(),
logs: logs,
hashes: make(chan []common.Hash),
headers: make(chan *types.Header),
installed: make(chan struct{}),
err: make(chan error),
id: rpc.NewID(),
typ: MinedAndPendingLogsSubscription,
logsCrit: crit,
created: time.Now(),
logs: logs,
hashes: make(chan []common.Hash),
headers: make(chan *types.Header),
finalizers: make(chan *core.FinalizedBlockInfo),
installed: make(chan struct{}),
err: make(chan error),
}
return es.subscribe(sub)
}
Expand All @@ -244,15 +250,16 @@ func (es *EventSystem) subscribeMinedPendingLogs(crit ethereum.FilterQuery, logs
// given criteria to the given logs channel.
func (es *EventSystem) subscribeLogs(crit ethereum.FilterQuery, logs chan []*types.Log) *Subscription {
sub := &subscription{
id: rpc.NewID(),
typ: LogsSubscription,
logsCrit: crit,
created: time.Now(),
logs: logs,
hashes: make(chan []common.Hash),
headers: make(chan *types.Header),
installed: make(chan struct{}),
err: make(chan error),
id: rpc.NewID(),
typ: LogsSubscription,
logsCrit: crit,
created: time.Now(),
logs: logs,
hashes: make(chan []common.Hash),
headers: make(chan *types.Header),
finalizers: make(chan *core.FinalizedBlockInfo),
installed: make(chan struct{}),
err: make(chan error),
}
return es.subscribe(sub)
}
Expand All @@ -261,15 +268,33 @@ func (es *EventSystem) subscribeLogs(crit ethereum.FilterQuery, logs chan []*typ
// transactions that enter the transaction pool.
func (es *EventSystem) subscribePendingLogs(crit ethereum.FilterQuery, logs chan []*types.Log) *Subscription {
sub := &subscription{
id: rpc.NewID(),
typ: PendingLogsSubscription,
logsCrit: crit,
created: time.Now(),
logs: logs,
hashes: make(chan []common.Hash),
headers: make(chan *types.Header),
installed: make(chan struct{}),
err: make(chan error),
id: rpc.NewID(),
typ: PendingLogsSubscription,
logsCrit: crit,
created: time.Now(),
logs: logs,
hashes: make(chan []common.Hash),
headers: make(chan *types.Header),
finalizers: make(chan *core.FinalizedBlockInfo),
installed: make(chan struct{}),
err: make(chan error),
}
return es.subscribe(sub)
}

// SubscribeNewFinalizedHeads creates a subscription that writes the block number of a block that is
// imported in the chain.
func (es *EventSystem) SubscribeNewFinalizedBlocks(finalizers chan *core.FinalizedBlockInfo) *Subscription {
sub := &subscription{
id: rpc.NewID(),
typ: FinalizedBlockSubscription,
created: time.Now(),
logs: make(chan []*types.Log),
hashes: make(chan []common.Hash),
headers: make(chan *types.Header),
finalizers: finalizers,
installed: make(chan struct{}),
err: make(chan error),
}
return es.subscribe(sub)
}
Expand All @@ -278,14 +303,15 @@ func (es *EventSystem) subscribePendingLogs(crit ethereum.FilterQuery, logs chan
// imported in the chain.
func (es *EventSystem) SubscribeNewHeads(headers chan *types.Header) *Subscription {
sub := &subscription{
id: rpc.NewID(),
typ: BlocksSubscription,
created: time.Now(),
logs: make(chan []*types.Log),
hashes: make(chan []common.Hash),
headers: headers,
installed: make(chan struct{}),
err: make(chan error),
id: rpc.NewID(),
typ: BlocksSubscription,
created: time.Now(),
logs: make(chan []*types.Log),
hashes: make(chan []common.Hash),
headers: headers,
finalizers: make(chan *core.FinalizedBlockInfo),
installed: make(chan struct{}),
err: make(chan error),
}
return es.subscribe(sub)
}
Expand All @@ -294,14 +320,15 @@ func (es *EventSystem) SubscribeNewHeads(headers chan *types.Header) *Subscripti
// transactions that enter the transaction pool.
func (es *EventSystem) SubscribePendingTxs(hashes chan []common.Hash) *Subscription {
sub := &subscription{
id: rpc.NewID(),
typ: PendingTransactionsSubscription,
created: time.Now(),
logs: make(chan []*types.Log),
hashes: hashes,
headers: make(chan *types.Header),
installed: make(chan struct{}),
err: make(chan error),
id: rpc.NewID(),
typ: PendingTransactionsSubscription,
created: time.Now(),
logs: make(chan []*types.Log),
hashes: hashes,
headers: make(chan *types.Header),
finalizers: make(chan *core.FinalizedBlockInfo),
installed: make(chan struct{}),
err: make(chan error),
}
return es.subscribe(sub)
}
Expand Down Expand Up @@ -351,6 +378,19 @@ func (es *EventSystem) handleTxsEvent(filters filterIndex, ev core.NewTxsEvent)
}
}

func (es *EventSystem) handleFinalizedEvent(filters filterIndex, ev core.ChainEvent) {
if ev.FinalizedBlockNumber == 0 || es.lastFinalized == ev.FinalizedBlockNumber {
return
}
es.lastFinalized = ev.FinalizedBlockNumber
for _, f := range filters[FinalizedBlockSubscription] {
f.finalizers <- &core.FinalizedBlockInfo{
FinalizedBlockNumber: hexutil.Uint64(ev.FinalizedBlockNumber),
FinalizedBlockHash: ev.FinalizedBlockHash,
}
}
}

func (es *EventSystem) handleChainEvent(filters filterIndex, ev core.ChainEvent) {
for _, f := range filters[BlocksSubscription] {
f.headers <- ev.Block.Header()
Expand Down Expand Up @@ -467,7 +507,7 @@ func (es *EventSystem) eventLoop() {
es.handlePendingLogs(index, ev)
case ev := <-es.chainCh:
es.handleChainEvent(index, ev)

es.handleFinalizedEvent(index, ev)
case f := <-es.install:
if f.typ == MinedAndPendingLogsSubscription {
// the type are logs and pending logs subscriptions
Expand Down
56 changes: 56 additions & 0 deletions eth/filters/filter_system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/consensus/ethash"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/bloombits"
Expand Down Expand Up @@ -157,6 +158,61 @@ func (b *testBackend) ServiceFilter(ctx context.Context, session *bloombits.Matc
}()
}

func TestFinalizedBlockSubscription(t *testing.T) {
t.Parallel()
var (
db = rawdb.NewMemoryDatabase()
backend = &testBackend{db: db}
api = NewPublicFilterAPI(backend, false, deadline)
genesis = (&core.Genesis{BaseFee: big.NewInt(params.InitialBaseFee)}).MustCommit(db)
chain, _ = core.GenerateChain(params.TestChainConfig, genesis, ethash.NewFaker(), db, 10, func(i int, gen *core.BlockGen) {}, true)
chainEvents = []core.ChainEvent{}
)
for _, blk := range chain {
chainEvents = append(chainEvents, core.ChainEvent{
Hash: blk.Hash(),
Block: blk,
FinalizedBlockNumber: rand.Uint64(),
FinalizedBlockHash: common.BigToHash(new(big.Int)),
})
}

chan0 := make(chan *core.FinalizedBlockInfo)
sub0 := api.events.SubscribeNewFinalizedBlocks(chan0)
chan1 := make(chan *core.FinalizedBlockInfo)
sub1 := api.events.SubscribeNewFinalizedBlocks(chan1)
go func() { // simulate client
i1, i2 := 0, 0
for i1 != len(chainEvents) || i2 != len(chainEvents) {
select {
case f := <-chan0:
if hexutil.Uint64(chainEvents[i1].FinalizedBlockNumber) != f.FinalizedBlockNumber {
t.Errorf("sub0 received invalid finalized block number on index %d, want %d, got %d",
i1, chainEvents[i1].FinalizedBlockNumber, f.FinalizedBlockNumber)
}
i1++
case f := <-chan1:
if hexutil.Uint64(chainEvents[i2].FinalizedBlockNumber) != f.FinalizedBlockNumber {
t.Errorf("sub1 received invalid finalized block number on index %d, want %d, got %d",
i2, chainEvents[i2].FinalizedBlockNumber, f.FinalizedBlockNumber)
}
i2++
}
}

sub0.Unsubscribe()
sub1.Unsubscribe()
}()

time.Sleep(1 * time.Second)
for _, e := range chainEvents {
backend.chainFeed.Send(e)
}

<-sub0.Err()
<-sub1.Err()
}

// TestBlockSubscription tests if a block subscription returns block hashes for posted chain events.
// It creates multiple subscriptions:
// - one at the start and should receive all posted chain events and a second (blockHashes)
Expand Down

0 comments on commit 061ab32

Please sign in to comment.