Skip to content

Commit

Permalink
blockchain: add the finalized block number and block hash to chain event
Browse files Browse the repository at this point in the history
This commit adds the finalized block number and block hash to chain event in
writeBlockWithState. It also adds the additionalchainevent.enable flag and moves
all blockchain's functionality related to subscriber under this flag.
  • Loading branch information
minh-bq committed Sep 27, 2023
1 parent d34f792 commit e67ad69
Show file tree
Hide file tree
Showing 8 changed files with 100 additions and 42 deletions.
1 change: 1 addition & 0 deletions cmd/ronin/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ var (
utils.BlsPasswordPath,
utils.BlsWalletPath,
utils.DisableRoninProtocol,
utils.AdditionalChainEventFlag,
}

rpcFlags = []cli.Flag{
Expand Down
1 change: 1 addition & 0 deletions cmd/ronin/usage.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ var AppHelpFlagGroups = []flags.FlagGroup{
utils.MonitorDoubleSign,
utils.StoreInternalTransactions,
utils.DisableRoninProtocol,
utils.AdditionalChainEventFlag,
},
},
{
Expand Down
9 changes: 9 additions & 0 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -904,6 +904,11 @@ var (
Name: "ronin.disable",
Usage: "Disable ronin p2p protocol",
}

AdditionalChainEventFlag = cli.BoolFlag{
Name: "additionalchainevent.enable",
Usage: "Enable additional chain event",
}
)

// MakeDataDir retrieves the currently requested data directory, terminating
Expand Down Expand Up @@ -1831,6 +1836,10 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) {
if ctx.GlobalBool(MonitorDoubleSign.Name) {
cfg.EnableMonitorDoubleSign = true
}

if ctx.GlobalBool(AdditionalChainEventFlag.Name) {
cfg.EnableAdditionalChainEvent = true
}
}

// SetDNSDiscoveryDefaults configures DNS discovery with the given URL if
Expand Down
109 changes: 74 additions & 35 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,8 +222,9 @@ type BlockChain struct {
processor Processor // Block transaction processor interface
vmConfig vm.Config

shouldPreserve func(*types.Block) bool // Function used to determine whether should preserve the given block.
shouldStoreInternalTxs bool
shouldPreserve func(*types.Block) bool // Function used to determine whether should preserve the given block.
shouldStoreInternalTxs bool
enableAdditionalChainEvent bool
}

// NewBlockChain returns a fully initialised block chain using information
Expand Down Expand Up @@ -509,6 +510,10 @@ func (bc *BlockChain) StartFinalityVoteMonitor() {
}
}

func (bc *BlockChain) EnableAdditionalChainEvent() {
bc.enableAdditionalChainEvent = true
}

// empty returns an indicator whether the blockchain is empty.
// Note, it's a special case that we connect a non-empty ancient
// database with an empty node, so that we can plugin the ancient
Expand Down Expand Up @@ -1111,7 +1116,9 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
stats.processed++

// Send chain event includes block data and logs
bc.sendNewBlockEvent(block, receiptChain[i])
if bc.enableAdditionalChainEvent {
bc.sendNewBlockEvent(block, receiptChain[i], false, false)
}
}

// Flush all tx-lookup index data.
Expand Down Expand Up @@ -1208,7 +1215,9 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
stats.processed++

// Send chain event includes block data and logs
bc.sendNewBlockEvent(block, receiptChain[i])
if bc.enableAdditionalChainEvent {
bc.sendNewBlockEvent(block, receiptChain[i], false, false)
}
}
// Write everything belongs to the blocks into the database. So that
// we can ensure all components of body is completed(body, receipts,
Expand Down Expand Up @@ -1268,15 +1277,49 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
return 0, nil
}

func (bc *BlockChain) sendNewBlockEvent(block *types.Block, receipts types.Receipts) {
func (bc *BlockChain) sendNewBlockEvent(
block *types.Block,
receipts types.Receipts,
includeInternalTxsAndDirtyAccounts bool,
includeFinalized bool,
) {
var (
internalTxs []*types.InternalTransaction
dirtyAccounts []*types.DirtyStateAccount
finalizedBlockNumber uint64
finalizedBlockHash common.Hash
)

if includeInternalTxsAndDirtyAccounts {
// get dirty accounts
dirtyAccounts = bc.ReadDirtyAccounts(block.Hash())
// get internal transactions
internalTxs = bc.ReadInternalTransactions(block.Hash())
}

if includeFinalized {
finalizedBlock := bc.FinalizedBlock()
if finalizedBlock != nil {
finalizedBlockNumber = finalizedBlock.NumberU64()
finalizedBlockHash = finalizedBlock.Hash()
}
}

logs := make([]*types.Log, 0)
internalTxs := make([]*types.InternalTransaction, 0)
dirtyAccounts := make([]*types.DirtyStateAccount, 0)
for _, receipt := range receipts {
logs = append(logs, receipt.Logs...)
}
log.Info("send new block event", "height", block.NumberU64(), "txs", len(block.Transactions()), "logs", len(logs))
bc.chainFeed.Send(ChainEvent{Block: block, Hash: block.Hash(), Logs: logs, InternalTxs: internalTxs, DirtyAccounts: dirtyAccounts, Receipts: receipts})
bc.chainFeed.Send(ChainEvent{
Block: block,
Hash: block.Hash(),
Logs: logs,
InternalTxs: internalTxs,
DirtyAccounts: dirtyAccounts,
Receipts: receipts,
FinalizedBlockNumber: finalizedBlockNumber,
FinalizedBlockHash: finalizedBlockHash,
})
}

var lastWrite uint64
Expand Down Expand Up @@ -1313,9 +1356,9 @@ func (bc *BlockChain) writeKnownBlock(block *types.Block) error {
}

// WriteBlockWithState writes the block and all associated state to the database.
func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types.Receipt, logs []*types.Log, state *state.StateDB, emitHeadEvent bool) (status WriteStatus, dirtyAccounts []*types.DirtyStateAccount, err error) {
func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types.Receipt, logs []*types.Log, state *state.StateDB, emitHeadEvent bool) (status WriteStatus, err error) {
if !bc.chainmu.TryLock() {
return NonStatTy, nil, errInsertionInterrupted
return NonStatTy, errInsertionInterrupted
}
defer bc.chainmu.Unlock()

Expand Down Expand Up @@ -1359,15 +1402,15 @@ func (bc *BlockChain) reorgNeeded(localBlock *types.Block, localTd *big.Int, ext

// writeBlockWithState writes the block and all associated state to the database,
// but is expects the chain mutex to be held.
func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.Receipt, logs []*types.Log, internalTxs []*types.InternalTransaction, state *state.StateDB, emitHeadEvent bool) (status WriteStatus, dirtyAccounts []*types.DirtyStateAccount, err error) {
func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.Receipt, logs []*types.Log, internalTxs []*types.InternalTransaction, state *state.StateDB, emitHeadEvent bool) (status WriteStatus, err error) {
if bc.insertStopped() {
return NonStatTy, nil, errInsertionInterrupted
return NonStatTy, errInsertionInterrupted
}

// Calculate the total difficulty of the block
ptd := bc.GetTd(block.ParentHash(), block.NumberU64()-1)
if ptd == nil {
return NonStatTy, nil, consensus.ErrUnknownAncestor
return NonStatTy, consensus.ErrUnknownAncestor
}
// Make sure no inconsistent state is leaked during insertion
currentBlock := bc.CurrentBlock()
Expand All @@ -1387,17 +1430,17 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
log.Crit("Failed to write block into disk", "err", err)
}
// Commit all cached state changes into underlying memory database.
dirtyAccounts = state.DirtyAccounts(block.Hash(), block.NumberU64())
dirtyAccounts := state.DirtyAccounts(block.Hash(), block.NumberU64())
root, err := state.Commit(bc.chainConfig.IsEIP158(block.Number()))
if err != nil {
return NonStatTy, nil, err
return NonStatTy, err
}
triedb := bc.stateCache.TrieDB()

// If we're running an archive node, always flush
if bc.cacheConfig.TrieDirtyDisabled {
if err := triedb.Commit(root, false, nil); err != nil {
return NonStatTy, nil, err
return NonStatTy, err
}
} else {
// Full but not archive node, do proper garbage collection
Expand Down Expand Up @@ -1452,7 +1495,7 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
log.Info("[reorg][writeBlockWithState]",
"currentBlock", currentBlock, "currentTD", localTd.Uint64(), "newBlock", block, "newTD", externTd.Uint64())
if err := bc.reorg(currentBlock, block); err != nil {
return NonStatTy, nil, err
return NonStatTy, err
}
}
status = CanonStatTy
Expand All @@ -1465,8 +1508,14 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
}
bc.futureBlocks.Remove(block.Hash())

if bc.enableAdditionalChainEvent && len(dirtyAccounts) != 0 {
bc.dirtyAccountFeed.Send(dirtyAccounts)
bc.dirtyAccountsCache.Add(block.Hash(), dirtyAccounts)
}

if status == CanonStatTy {
bc.chainFeed.Send(ChainEvent{Block: block, Hash: block.Hash(), Logs: logs, InternalTxs: internalTxs, DirtyAccounts: dirtyAccounts, Receipts: receipts})
// Always send chain event here
bc.sendNewBlockEvent(block, receipts, true, true)
if len(logs) > 0 {
bc.logsFeed.Send(logs)
}
Expand All @@ -1481,7 +1530,7 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
} else {
bc.chainSideFeed.Send(ChainSideEvent{Block: block})
}
return status, dirtyAccounts, nil
return status, nil
}

// addFutureBlock checks if the block is within the max allowed window to get
Expand Down Expand Up @@ -1767,7 +1816,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er
}

// store internal txs to db and send them to internalTxFeed
if len(internalTxs) > 0 {
if bc.enableAdditionalChainEvent && len(internalTxs) > 0 {
bc.WriteInternalTransactions(block.Hash(), internalTxs)
bc.internalTxFeed.Send(internalTxs)
}
Expand Down Expand Up @@ -1802,17 +1851,12 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er

// Write the block to the chain and get the status.
substart = time.Now()
status, dirtyAccounts, err := bc.writeBlockWithState(block, receipts, logs, internalTxs, statedb, false)
status, err := bc.writeBlockWithState(block, receipts, logs, internalTxs, statedb, false)
atomic.StoreUint32(&followupInterrupt, 1)
if err != nil {
return it.index, err
}

if dirtyAccounts != nil {
bc.dirtyAccountFeed.Send(dirtyAccounts)
bc.dirtyAccountsCache.Add(block.Hash(), dirtyAccounts)
}

// Update the metrics touched during block commit
accountCommitTimer.Update(statedb.AccountCommits) // Account commits are complete, we can mark them
storageCommitTimer.Update(statedb.StorageCommits) // Storage commits are complete, we can mark them
Expand Down Expand Up @@ -2125,16 +2169,11 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
bc.writeHeadBlock(newChain[i])

// Collect reborn logs due to chain reorg
receipts, logs := collectLogs(newChain[i].Hash(), false)

// get dirty accounts
dirtyAccounts := bc.ReadDirtyAccounts(newChain[i].Hash())
receipts, _ := collectLogs(newChain[i].Hash(), false)

// get internal transactions
internalTxs := bc.ReadInternalTransactions(newChain[i].Hash())

log.Info("send new block event due to reorg", "height", newChain[i].NumberU64(), "txs", len(newChain[i].Transactions()), "logs", len(logs))
bc.chainFeed.Send(ChainEvent{Block: newChain[i], Hash: newChain[i].Hash(), Logs: logs, InternalTxs: internalTxs, DirtyAccounts: dirtyAccounts, Receipts: receipts})
if bc.enableAdditionalChainEvent {
bc.sendNewBlockEvent(newChain[i], receipts, true, false)
}

// Collect the new added transactions.
addedTxs = append(addedTxs, newChain[i].Transactions()...)
Expand Down
14 changes: 8 additions & 6 deletions core/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,14 @@ type RemovedLogsEvent struct{ Logs []*types.Log }
type NewVoteEvent struct{ Vote *types.VoteEnvelope }

type ChainEvent struct {
Block *types.Block
Hash common.Hash
Logs []*types.Log
InternalTxs []*types.InternalTransaction
DirtyAccounts []*types.DirtyStateAccount
Receipts types.Receipts
Block *types.Block
Hash common.Hash
Logs []*types.Log
InternalTxs []*types.InternalTransaction
DirtyAccounts []*types.DirtyStateAccount
Receipts types.Receipts
FinalizedBlockNumber uint64
FinalizedBlockHash common.Hash
}

type ChainSideEvent struct {
Expand Down
3 changes: 3 additions & 0 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,9 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
if config.EnableMonitorDoubleSign {
go eth.blockchain.StartDoubleSignMonitor()
}
if config.EnableAdditionalChainEvent {
eth.blockchain.EnableAdditionalChainEvent()
}

// Rewind the chain in case of an incompatible config upgrade.
if compat, ok := genesisErr.(*params.ConfigCompatError); ok {
Expand Down
3 changes: 3 additions & 0 deletions eth/ethconfig/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,9 @@ type Config struct {

// Disable ronin p2p protocol
DisableRoninProtocol bool

// Send additional chain event
EnableAdditionalChainEvent bool
}

// CreateConsensusEngine creates a consensus engine for the given chain configuration.
Expand Down
2 changes: 1 addition & 1 deletion miner/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -735,7 +735,7 @@ func (w *worker) resultLoop() {
logs = append(logs, receipt.Logs...)
}
// Commit block and state to database.
_, _, err := w.chain.WriteBlockWithState(block, receipts, logs, task.state, true)
_, err := w.chain.WriteBlockWithState(block, receipts, logs, task.state, true)
if err != nil {
log.Error("Failed writing block to chain", "err", err)
continue
Expand Down

0 comments on commit e67ad69

Please sign in to comment.