diff --git a/cmd/ronin/main.go b/cmd/ronin/main.go index 06bb2f0794..392784f0f1 100644 --- a/cmd/ronin/main.go +++ b/cmd/ronin/main.go @@ -173,6 +173,7 @@ var ( utils.BlsPasswordPath, utils.BlsWalletPath, utils.DisableRoninProtocol, + utils.AdditionalChainEventFlag, } rpcFlags = []cli.Flag{ diff --git a/cmd/ronin/usage.go b/cmd/ronin/usage.go index 682398f4bf..ca08ff9c2f 100644 --- a/cmd/ronin/usage.go +++ b/cmd/ronin/usage.go @@ -58,6 +58,7 @@ var AppHelpFlagGroups = []flags.FlagGroup{ utils.MonitorDoubleSign, utils.StoreInternalTransactions, utils.DisableRoninProtocol, + utils.AdditionalChainEventFlag, }, }, { diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index b8e13f2afb..d469be4486 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -904,6 +904,11 @@ var ( Name: "ronin.disable", Usage: "Disable ronin p2p protocol", } + + AdditionalChainEventFlag = cli.BoolFlag{ + Name: "additionalchainevent.enable", + Usage: "Enable additional chain event", + } ) // MakeDataDir retrieves the currently requested data directory, terminating @@ -1831,6 +1836,10 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) { if ctx.GlobalBool(MonitorDoubleSign.Name) { cfg.EnableMonitorDoubleSign = true } + + if ctx.GlobalBool(AdditionalChainEventFlag.Name) { + cfg.EnableAdditionalChainEvent = true + } } // SetDNSDiscoveryDefaults configures DNS discovery with the given URL if diff --git a/core/blockchain.go b/core/blockchain.go index 053dd464f7..7ff4d96de1 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -222,8 +222,9 @@ type BlockChain struct { processor Processor // Block transaction processor interface vmConfig vm.Config - shouldPreserve func(*types.Block) bool // Function used to determine whether should preserve the given block. - shouldStoreInternalTxs bool + shouldPreserve func(*types.Block) bool // Function used to determine whether should preserve the given block. + shouldStoreInternalTxs bool + enableAdditionalChainEvent bool } // NewBlockChain returns a fully initialised block chain using information @@ -509,6 +510,10 @@ func (bc *BlockChain) StartFinalityVoteMonitor() { } } +func (bc *BlockChain) EnableAdditionalChainEvent() { + bc.enableAdditionalChainEvent = true +} + // empty returns an indicator whether the blockchain is empty. // Note, it's a special case that we connect a non-empty ancient // database with an empty node, so that we can plugin the ancient @@ -1111,7 +1116,9 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ stats.processed++ // Send chain event includes block data and logs - bc.sendNewBlockEvent(block, receiptChain[i]) + if bc.enableAdditionalChainEvent { + bc.sendNewBlockEvent(block, receiptChain[i], false, false) + } } // Flush all tx-lookup index data. @@ -1208,7 +1215,9 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ stats.processed++ // Send chain event includes block data and logs - bc.sendNewBlockEvent(block, receiptChain[i]) + if bc.enableAdditionalChainEvent { + bc.sendNewBlockEvent(block, receiptChain[i], false, false) + } } // Write everything belongs to the blocks into the database. So that // we can ensure all components of body is completed(body, receipts, @@ -1268,15 +1277,49 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ return 0, nil } -func (bc *BlockChain) sendNewBlockEvent(block *types.Block, receipts types.Receipts) { +func (bc *BlockChain) sendNewBlockEvent( + block *types.Block, + receipts types.Receipts, + includeInternalTxsAndDirtyAccounts bool, + includeFinalized bool, +) { + var ( + internalTxs []*types.InternalTransaction + dirtyAccounts []*types.DirtyStateAccount + finalizedBlockNumber uint64 + finalizedBlockHash common.Hash + ) + + if includeInternalTxsAndDirtyAccounts { + // get dirty accounts + dirtyAccounts = bc.ReadDirtyAccounts(block.Hash()) + // get internal transactions + internalTxs = bc.ReadInternalTransactions(block.Hash()) + } + + if includeFinalized { + finalizedBlock := bc.FinalizedBlock() + if finalizedBlock != nil { + finalizedBlockNumber = finalizedBlock.NumberU64() + finalizedBlockHash = finalizedBlock.Hash() + } + } + logs := make([]*types.Log, 0) - internalTxs := make([]*types.InternalTransaction, 0) - dirtyAccounts := make([]*types.DirtyStateAccount, 0) for _, receipt := range receipts { logs = append(logs, receipt.Logs...) } log.Info("send new block event", "height", block.NumberU64(), "txs", len(block.Transactions()), "logs", len(logs)) - bc.chainFeed.Send(ChainEvent{Block: block, Hash: block.Hash(), Logs: logs, InternalTxs: internalTxs, DirtyAccounts: dirtyAccounts, Receipts: receipts}) + bc.chainFeed.Send(ChainEvent{ + Block: block, + Hash: block.Hash(), + Logs: logs, + InternalTxs: internalTxs, + DirtyAccounts: dirtyAccounts, + Receipts: receipts, + FinalizedBlockNumber: finalizedBlockNumber, + FinalizedBlockHash: finalizedBlockHash, + }) } var lastWrite uint64 @@ -1313,9 +1356,9 @@ func (bc *BlockChain) writeKnownBlock(block *types.Block) error { } // WriteBlockWithState writes the block and all associated state to the database. -func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types.Receipt, logs []*types.Log, state *state.StateDB, emitHeadEvent bool) (status WriteStatus, dirtyAccounts []*types.DirtyStateAccount, err error) { +func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types.Receipt, logs []*types.Log, state *state.StateDB, emitHeadEvent bool) (status WriteStatus, err error) { if !bc.chainmu.TryLock() { - return NonStatTy, nil, errInsertionInterrupted + return NonStatTy, errInsertionInterrupted } defer bc.chainmu.Unlock() @@ -1359,15 +1402,15 @@ func (bc *BlockChain) reorgNeeded(localBlock *types.Block, localTd *big.Int, ext // writeBlockWithState writes the block and all associated state to the database, // but is expects the chain mutex to be held. -func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.Receipt, logs []*types.Log, internalTxs []*types.InternalTransaction, state *state.StateDB, emitHeadEvent bool) (status WriteStatus, dirtyAccounts []*types.DirtyStateAccount, err error) { +func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.Receipt, logs []*types.Log, internalTxs []*types.InternalTransaction, state *state.StateDB, emitHeadEvent bool) (status WriteStatus, err error) { if bc.insertStopped() { - return NonStatTy, nil, errInsertionInterrupted + return NonStatTy, errInsertionInterrupted } // Calculate the total difficulty of the block ptd := bc.GetTd(block.ParentHash(), block.NumberU64()-1) if ptd == nil { - return NonStatTy, nil, consensus.ErrUnknownAncestor + return NonStatTy, consensus.ErrUnknownAncestor } // Make sure no inconsistent state is leaked during insertion currentBlock := bc.CurrentBlock() @@ -1387,17 +1430,17 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types. log.Crit("Failed to write block into disk", "err", err) } // Commit all cached state changes into underlying memory database. - dirtyAccounts = state.DirtyAccounts(block.Hash(), block.NumberU64()) + dirtyAccounts := state.DirtyAccounts(block.Hash(), block.NumberU64()) root, err := state.Commit(bc.chainConfig.IsEIP158(block.Number())) if err != nil { - return NonStatTy, nil, err + return NonStatTy, err } triedb := bc.stateCache.TrieDB() // If we're running an archive node, always flush if bc.cacheConfig.TrieDirtyDisabled { if err := triedb.Commit(root, false, nil); err != nil { - return NonStatTy, nil, err + return NonStatTy, err } } else { // Full but not archive node, do proper garbage collection @@ -1452,7 +1495,7 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types. log.Info("[reorg][writeBlockWithState]", "currentBlock", currentBlock, "currentTD", localTd.Uint64(), "newBlock", block, "newTD", externTd.Uint64()) if err := bc.reorg(currentBlock, block); err != nil { - return NonStatTy, nil, err + return NonStatTy, err } } status = CanonStatTy @@ -1465,8 +1508,14 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types. } bc.futureBlocks.Remove(block.Hash()) + if bc.enableAdditionalChainEvent && len(dirtyAccounts) != 0 { + bc.dirtyAccountFeed.Send(dirtyAccounts) + bc.dirtyAccountsCache.Add(block.Hash(), dirtyAccounts) + } + if status == CanonStatTy { - bc.chainFeed.Send(ChainEvent{Block: block, Hash: block.Hash(), Logs: logs, InternalTxs: internalTxs, DirtyAccounts: dirtyAccounts, Receipts: receipts}) + // Always send chain event here + bc.sendNewBlockEvent(block, receipts, true, true) if len(logs) > 0 { bc.logsFeed.Send(logs) } @@ -1481,7 +1530,7 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types. } else { bc.chainSideFeed.Send(ChainSideEvent{Block: block}) } - return status, dirtyAccounts, nil + return status, nil } // addFutureBlock checks if the block is within the max allowed window to get @@ -1767,7 +1816,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er } // store internal txs to db and send them to internalTxFeed - if len(internalTxs) > 0 { + if bc.enableAdditionalChainEvent && len(internalTxs) > 0 { bc.WriteInternalTransactions(block.Hash(), internalTxs) bc.internalTxFeed.Send(internalTxs) } @@ -1802,17 +1851,12 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er // Write the block to the chain and get the status. substart = time.Now() - status, dirtyAccounts, err := bc.writeBlockWithState(block, receipts, logs, internalTxs, statedb, false) + status, err := bc.writeBlockWithState(block, receipts, logs, internalTxs, statedb, false) atomic.StoreUint32(&followupInterrupt, 1) if err != nil { return it.index, err } - if dirtyAccounts != nil { - bc.dirtyAccountFeed.Send(dirtyAccounts) - bc.dirtyAccountsCache.Add(block.Hash(), dirtyAccounts) - } - // Update the metrics touched during block commit accountCommitTimer.Update(statedb.AccountCommits) // Account commits are complete, we can mark them storageCommitTimer.Update(statedb.StorageCommits) // Storage commits are complete, we can mark them @@ -2125,16 +2169,11 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error { bc.writeHeadBlock(newChain[i]) // Collect reborn logs due to chain reorg - receipts, logs := collectLogs(newChain[i].Hash(), false) - - // get dirty accounts - dirtyAccounts := bc.ReadDirtyAccounts(newChain[i].Hash()) + receipts, _ := collectLogs(newChain[i].Hash(), false) - // get internal transactions - internalTxs := bc.ReadInternalTransactions(newChain[i].Hash()) - - log.Info("send new block event due to reorg", "height", newChain[i].NumberU64(), "txs", len(newChain[i].Transactions()), "logs", len(logs)) - bc.chainFeed.Send(ChainEvent{Block: newChain[i], Hash: newChain[i].Hash(), Logs: logs, InternalTxs: internalTxs, DirtyAccounts: dirtyAccounts, Receipts: receipts}) + if bc.enableAdditionalChainEvent { + bc.sendNewBlockEvent(newChain[i], receipts, true, false) + } // Collect the new added transactions. addedTxs = append(addedTxs, newChain[i].Transactions()...) diff --git a/core/events.go b/core/events.go index 02b3756c3e..e477da7e23 100644 --- a/core/events.go +++ b/core/events.go @@ -34,12 +34,14 @@ type RemovedLogsEvent struct{ Logs []*types.Log } type NewVoteEvent struct{ Vote *types.VoteEnvelope } type ChainEvent struct { - Block *types.Block - Hash common.Hash - Logs []*types.Log - InternalTxs []*types.InternalTransaction - DirtyAccounts []*types.DirtyStateAccount - Receipts types.Receipts + Block *types.Block + Hash common.Hash + Logs []*types.Log + InternalTxs []*types.InternalTransaction + DirtyAccounts []*types.DirtyStateAccount + Receipts types.Receipts + FinalizedBlockNumber uint64 + FinalizedBlockHash common.Hash } type ChainSideEvent struct { diff --git a/eth/backend.go b/eth/backend.go index 53788316ee..35c54feb58 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -205,6 +205,9 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { if config.EnableMonitorDoubleSign { go eth.blockchain.StartDoubleSignMonitor() } + if config.EnableAdditionalChainEvent { + eth.blockchain.EnableAdditionalChainEvent() + } // Rewind the chain in case of an incompatible config upgrade. if compat, ok := genesisErr.(*params.ConfigCompatError); ok { diff --git a/eth/ethconfig/config.go b/eth/ethconfig/config.go index fd07d731fb..dd18f3ba8f 100644 --- a/eth/ethconfig/config.go +++ b/eth/ethconfig/config.go @@ -215,6 +215,9 @@ type Config struct { // Disable ronin p2p protocol DisableRoninProtocol bool + + // Send additional chain event + EnableAdditionalChainEvent bool } // CreateConsensusEngine creates a consensus engine for the given chain configuration. diff --git a/miner/worker.go b/miner/worker.go index c0602fa4e4..f8fc46ba44 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -735,7 +735,7 @@ func (w *worker) resultLoop() { logs = append(logs, receipt.Logs...) } // Commit block and state to database. - _, _, err := w.chain.WriteBlockWithState(block, receipts, logs, task.state, true) + _, err := w.chain.WriteBlockWithState(block, receipts, logs, task.state, true) if err != nil { log.Error("Failed writing block to chain", "err", err) continue