Skip to content

Commit

Permalink
[feat]: add finalized block subscription
Browse files Browse the repository at this point in the history
  • Loading branch information
andicrypt committed Oct 31, 2023
1 parent b27867e commit c0f9b0f
Show file tree
Hide file tree
Showing 5 changed files with 175 additions and 55 deletions.
7 changes: 5 additions & 2 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1514,8 +1514,11 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
}

if status == CanonStatTy {
// Always send chain event here
bc.sendNewBlockEvent(block, receipts, true, true)
if bc.enableAdditionalChainEvent {
bc.sendNewBlockEvent(block, receipts, true, true)
} else {
bc.sendNewBlockEvent(block, receipts, false, false)
}
if len(logs) > 0 {
bc.logsFeed.Send(logs)
}
Expand Down
5 changes: 5 additions & 0 deletions core/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ type ChainEvent struct {
FinalizedBlockHash common.Hash
}

type Finalizer struct {
FinalizedBlockNumber uint64 `json:"finalizedBlockNumber"`
FinalizedBlockHash common.Hash `json:"finalizedBlockHash"`
}

type ChainSideEvent struct {
Block *types.Block
}
Expand Down
30 changes: 30 additions & 0 deletions eth/filters/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
Expand Down Expand Up @@ -210,6 +211,35 @@ func (api *PublicFilterAPI) NewBlockFilter() rpc.ID {
return headerSub.ID
}

// NewFinalizedBlock send a notification each time a new block is marked as "Finalized"
func (api *PublicFilterAPI) NewFinalizedBlock(ctx context.Context) (*rpc.Subscription, error) {
notifier, supported := rpc.NotifierFromContext(ctx)
if !supported {
return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
}

rpcSub := notifier.CreateSubscription()
go func() {
finalizer := make(chan *core.Finalizer)
fSub := api.events.SubscribeNewFinalizedBlocks(finalizer)

for {
select {
case f := <-finalizer:
notifier.Notify(rpcSub.ID, f)
case <-rpcSub.Err():
fSub.Unsubscribe()
return
case <-notifier.Closed():
fSub.Unsubscribe()
return
}
}
}()

return rpcSub, nil
}

// NewHeads send a notification each time a new (header) block is appended to the chain.
func (api *PublicFilterAPI) NewHeads(ctx context.Context) (*rpc.Subscription, error) {
notifier, supported := rpc.NotifierFromContext(ctx)
Expand Down
140 changes: 87 additions & 53 deletions eth/filters/filter_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ const (
PendingTransactionsSubscription
// BlocksSubscription queries hashes for blocks that are imported
BlocksSubscription
// FinalizedBlockSubscription
FinalizedBlockSubscription
// LastSubscription keeps track of the last index
LastIndexSubscription
)
Expand All @@ -69,15 +71,16 @@ const (
)

type subscription struct {
id rpc.ID
typ Type
created time.Time
logsCrit ethereum.FilterQuery
logs chan []*types.Log
hashes chan []common.Hash
headers chan *types.Header
installed chan struct{} // closed when the filter is installed
err chan error // closed when the filter is uninstalled
id rpc.ID
typ Type
created time.Time
logsCrit ethereum.FilterQuery
logs chan []*types.Log
hashes chan []common.Hash
headers chan *types.Header
finalizers chan *core.Finalizer
installed chan struct{} // closed when the filter is installed
err chan error // closed when the filter is uninstalled
}

// EventSystem creates subscriptions, processes events and broadcasts them to the
Expand Down Expand Up @@ -227,15 +230,16 @@ func (es *EventSystem) SubscribeLogs(crit ethereum.FilterQuery, logs chan []*typ
// pending logs that match the given criteria.
func (es *EventSystem) subscribeMinedPendingLogs(crit ethereum.FilterQuery, logs chan []*types.Log) *Subscription {
sub := &subscription{
id: rpc.NewID(),
typ: MinedAndPendingLogsSubscription,
logsCrit: crit,
created: time.Now(),
logs: logs,
hashes: make(chan []common.Hash),
headers: make(chan *types.Header),
installed: make(chan struct{}),
err: make(chan error),
id: rpc.NewID(),
typ: MinedAndPendingLogsSubscription,
logsCrit: crit,
created: time.Now(),
logs: logs,
hashes: make(chan []common.Hash),
headers: make(chan *types.Header),
finalizers: make(chan *core.Finalizer),
installed: make(chan struct{}),
err: make(chan error),
}
return es.subscribe(sub)
}
Expand All @@ -244,15 +248,16 @@ func (es *EventSystem) subscribeMinedPendingLogs(crit ethereum.FilterQuery, logs
// given criteria to the given logs channel.
func (es *EventSystem) subscribeLogs(crit ethereum.FilterQuery, logs chan []*types.Log) *Subscription {
sub := &subscription{
id: rpc.NewID(),
typ: LogsSubscription,
logsCrit: crit,
created: time.Now(),
logs: logs,
hashes: make(chan []common.Hash),
headers: make(chan *types.Header),
installed: make(chan struct{}),
err: make(chan error),
id: rpc.NewID(),
typ: LogsSubscription,
logsCrit: crit,
created: time.Now(),
logs: logs,
hashes: make(chan []common.Hash),
headers: make(chan *types.Header),
finalizers: make(chan *core.Finalizer),
installed: make(chan struct{}),
err: make(chan error),
}
return es.subscribe(sub)
}
Expand All @@ -261,15 +266,33 @@ func (es *EventSystem) subscribeLogs(crit ethereum.FilterQuery, logs chan []*typ
// transactions that enter the transaction pool.
func (es *EventSystem) subscribePendingLogs(crit ethereum.FilterQuery, logs chan []*types.Log) *Subscription {
sub := &subscription{
id: rpc.NewID(),
typ: PendingLogsSubscription,
logsCrit: crit,
created: time.Now(),
logs: logs,
hashes: make(chan []common.Hash),
headers: make(chan *types.Header),
installed: make(chan struct{}),
err: make(chan error),
id: rpc.NewID(),
typ: PendingLogsSubscription,
logsCrit: crit,
created: time.Now(),
logs: logs,
hashes: make(chan []common.Hash),
headers: make(chan *types.Header),
finalizers: make(chan *core.Finalizer),
installed: make(chan struct{}),
err: make(chan error),
}
return es.subscribe(sub)
}

// SubscribeNewFinalizedHeads creates a subscription that writes the block number of a block that is
// imported in the chain.
func (es *EventSystem) SubscribeNewFinalizedBlocks(finalizers chan *core.Finalizer) *Subscription {
sub := &subscription{
id: rpc.NewID(),
typ: FinalizedBlockSubscription,
created: time.Now(),
logs: make(chan []*types.Log),
hashes: make(chan []common.Hash),
headers: make(chan *types.Header),
finalizers: finalizers,
installed: make(chan struct{}),
err: make(chan error),
}
return es.subscribe(sub)
}
Expand All @@ -278,14 +301,15 @@ func (es *EventSystem) subscribePendingLogs(crit ethereum.FilterQuery, logs chan
// imported in the chain.
func (es *EventSystem) SubscribeNewHeads(headers chan *types.Header) *Subscription {
sub := &subscription{
id: rpc.NewID(),
typ: BlocksSubscription,
created: time.Now(),
logs: make(chan []*types.Log),
hashes: make(chan []common.Hash),
headers: headers,
installed: make(chan struct{}),
err: make(chan error),
id: rpc.NewID(),
typ: BlocksSubscription,
created: time.Now(),
logs: make(chan []*types.Log),
hashes: make(chan []common.Hash),
headers: headers,
finalizers: make(chan *core.Finalizer),
installed: make(chan struct{}),
err: make(chan error),
}
return es.subscribe(sub)
}
Expand All @@ -294,14 +318,15 @@ func (es *EventSystem) SubscribeNewHeads(headers chan *types.Header) *Subscripti
// transactions that enter the transaction pool.
func (es *EventSystem) SubscribePendingTxs(hashes chan []common.Hash) *Subscription {
sub := &subscription{
id: rpc.NewID(),
typ: PendingTransactionsSubscription,
created: time.Now(),
logs: make(chan []*types.Log),
hashes: hashes,
headers: make(chan *types.Header),
installed: make(chan struct{}),
err: make(chan error),
id: rpc.NewID(),
typ: PendingTransactionsSubscription,
created: time.Now(),
logs: make(chan []*types.Log),
hashes: hashes,
headers: make(chan *types.Header),
finalizers: make(chan *core.Finalizer),
installed: make(chan struct{}),
err: make(chan error),
}
return es.subscribe(sub)
}
Expand Down Expand Up @@ -351,6 +376,15 @@ func (es *EventSystem) handleTxsEvent(filters filterIndex, ev core.NewTxsEvent)
}
}

func (es *EventSystem) handleFinalizedEvent(filters filterIndex, ev core.ChainEvent) {
for _, f := range filters[FinalizedBlockSubscription] {
f.finalizers <- &core.Finalizer{
FinalizedBlockNumber: ev.FinalizedBlockNumber,
FinalizedBlockHash: ev.FinalizedBlockHash,
}
}
}

func (es *EventSystem) handleChainEvent(filters filterIndex, ev core.ChainEvent) {
for _, f := range filters[BlocksSubscription] {
f.headers <- ev.Block.Header()
Expand Down Expand Up @@ -467,7 +501,7 @@ func (es *EventSystem) eventLoop() {
es.handlePendingLogs(index, ev)
case ev := <-es.chainCh:
es.handleChainEvent(index, ev)

es.handleFinalizedEvent(index, ev)
case f := <-es.install:
if f.typ == MinedAndPendingLogsSubscription {
// the type are logs and pending logs subscriptions
Expand Down
48 changes: 48 additions & 0 deletions eth/filters/filter_system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,54 @@ func (b *testBackend) ServiceFilter(ctx context.Context, session *bloombits.Matc
}()
}

func TestFinalizedBlockSubscription(t *testing.T) {
t.Parallel()
var (
db = rawdb.NewMemoryDatabase()
backend = &testBackend{db: db}
api = NewPublicFilterAPI(backend, false, deadline)
genesis = (&core.Genesis{BaseFee: big.NewInt(params.InitialBaseFee)}).MustCommit(db)
chain, _ = core.GenerateChain(params.TestChainConfig, genesis, ethash.NewFaker(), db, 10, func(i int, gen *core.BlockGen) {}, true)
chainEvents = []core.ChainEvent{}
)
for _, blk := range chain {
chainEvents = append(chainEvents, core.ChainEvent{Hash: blk.Hash(), Block: blk})
}

chan0 := make(chan *core.Finalizer)
sub0 := api.events.SubscribeNewFinalizedBlocks(chan0)
chan1 := make(chan *core.Finalizer)
sub1 := api.events.SubscribeNewFinalizedBlocks(chan1)
go func() { // simulate client
i1, i2 := 0, 0
for i1 != len(chainEvents) || i2 != len(chainEvents) {
select {
case f := <-chan0:
if chainEvents[i1].FinalizedBlockNumber != f.FinalizedBlockNumber {
t.Errorf("sub0 received invalid hash on index %d, want %d, got %d", i1, chainEvents[i1].FinalizedBlockNumber, f.FinalizedBlockNumber)
}
i1++
case f := <-chan1:
if chainEvents[i2].FinalizedBlockNumber != f.FinalizedBlockNumber {
t.Errorf("sub1 received invalid hash on index %d, want %d, got %d", i2, chainEvents[i2].FinalizedBlockNumber, f.FinalizedBlockNumber)
}
i2++
}
}

sub0.Unsubscribe()
sub1.Unsubscribe()
}()

time.Sleep(1 * time.Second)
for _, e := range chainEvents {
backend.chainFeed.Send(e)
}

<-sub0.Err()
<-sub1.Err()
}

// TestBlockSubscription tests if a block subscription returns block hashes for posted chain events.
// It creates multiple subscriptions:
// - one at the start and should receive all posted chain events and a second (blockHashes)
Expand Down

0 comments on commit c0f9b0f

Please sign in to comment.