diff --git a/crates/e2e-test-utils/src/testsuite/actions/produce_blocks.rs b/crates/e2e-test-utils/src/testsuite/actions/produce_blocks.rs index 9d2088c11a4..74a5e2ba1d5 100644 --- a/crates/e2e-test-utils/src/testsuite/actions/produce_blocks.rs +++ b/crates/e2e-test-utils/src/testsuite/actions/produce_blocks.rs @@ -510,7 +510,7 @@ where Box::pin(async move { let mut accepted_check: bool = false; - let mut latest_block = env + let latest_block = env .current_block_info() .ok_or_else(|| eyre::eyre!("No latest block information available"))?; @@ -603,10 +603,6 @@ where rpc_latest_header.inner.timestamp; env.active_node_state_mut()?.latest_fork_choice_state.head_block_hash = rpc_latest_header.hash; - - // update local copy for any further usage in this scope - latest_block.hash = rpc_latest_header.hash; - latest_block.number = rpc_latest_header.inner.number; } } diff --git a/crates/engine/primitives/src/config.rs b/crates/engine/primitives/src/config.rs index b2f8da4d424..70763b6701f 100644 --- a/crates/engine/primitives/src/config.rs +++ b/crates/engine/primitives/src/config.rs @@ -21,6 +21,14 @@ fn default_storage_worker_count() -> usize { } } +/// Returns the default number of account worker threads. +/// +/// Account workers coordinate storage proof collection and account trie traversal. +/// They are set to the same count as storage workers for simplicity. +fn default_account_worker_count() -> usize { + default_storage_worker_count() +} + /// The size of proof targets chunk to spawn in one multiproof calculation. pub const DEFAULT_MULTIPROOF_TASK_CHUNK_SIZE: usize = 10; @@ -123,6 +131,8 @@ pub struct TreeConfig { allow_unwind_canonical_header: bool, /// Number of storage proof worker threads. storage_worker_count: usize, + /// Number of account proof worker threads. + account_worker_count: usize, } impl Default for TreeConfig { @@ -150,6 +160,7 @@ impl Default for TreeConfig { prewarm_max_concurrency: DEFAULT_PREWARM_MAX_CONCURRENCY, allow_unwind_canonical_header: false, storage_worker_count: default_storage_worker_count(), + account_worker_count: default_account_worker_count(), } } } @@ -180,6 +191,7 @@ impl TreeConfig { prewarm_max_concurrency: usize, allow_unwind_canonical_header: bool, storage_worker_count: usize, + account_worker_count: usize, ) -> Self { assert!(max_proof_task_concurrency > 0, "max_proof_task_concurrency must be at least 1"); Self { @@ -205,6 +217,7 @@ impl TreeConfig { prewarm_max_concurrency, allow_unwind_canonical_header, storage_worker_count, + account_worker_count, } } @@ -482,4 +495,15 @@ impl TreeConfig { self.storage_worker_count = storage_worker_count; self } + + /// Return the number of account proof worker threads. + pub const fn account_worker_count(&self) -> usize { + self.account_worker_count + } + + /// Setter for the number of account proof worker threads. + pub const fn with_account_worker_count(mut self, account_worker_count: usize) -> Self { + self.account_worker_count = account_worker_count; + self + } } diff --git a/crates/engine/tree/src/tree/payload_processor/mod.rs b/crates/engine/tree/src/tree/payload_processor/mod.rs index d449031606e..c24b0d1fe16 100644 --- a/crates/engine/tree/src/tree/payload_processor/mod.rs +++ b/crates/engine/tree/src/tree/payload_processor/mod.rs @@ -192,8 +192,7 @@ where { let (to_sparse_trie, sparse_trie_rx) = channel(); // spawn multiproof task, save the trie input - let (trie_input, state_root_config) = - MultiProofConfig::new_from_input(consistent_view, trie_input); + let (trie_input, state_root_config) = MultiProofConfig::from_input(trie_input); self.trie_input = Some(trie_input); // Create and spawn the storage proof task @@ -202,14 +201,15 @@ where state_root_config.state_sorted.clone(), state_root_config.prefix_sets.clone(), ); - let max_proof_task_concurrency = config.max_proof_task_concurrency() as usize; let storage_worker_count = config.storage_worker_count(); + let account_worker_count = config.account_worker_count(); + let max_proof_task_concurrency = config.max_proof_task_concurrency() as usize; let proof_task = match ProofTaskManager::new( self.executor.handle().clone(), - state_root_config.consistent_view.clone(), + consistent_view, task_ctx, - max_proof_task_concurrency, storage_worker_count, + account_worker_count, ) { Ok(task) => task, Err(error) => { diff --git a/crates/engine/tree/src/tree/payload_processor/multiproof.rs b/crates/engine/tree/src/tree/payload_processor/multiproof.rs index 18d394477fb..f865312b83d 100644 --- a/crates/engine/tree/src/tree/payload_processor/multiproof.rs +++ b/crates/engine/tree/src/tree/payload_processor/multiproof.rs @@ -12,14 +12,17 @@ use derive_more::derive::Deref; use metrics::Histogram; use reth_errors::ProviderError; use reth_metrics::Metrics; -use reth_provider::{providers::ConsistentDbView, BlockReader, DatabaseProviderFactory, FactoryTx}; use reth_revm::state::EvmState; use reth_trie::{ added_removed_keys::MultiAddedRemovedKeys, prefix_set::TriePrefixSetsMut, updates::TrieUpdatesSorted, DecodedMultiProof, HashedPostState, HashedPostStateSorted, HashedStorage, MultiProofTargets, TrieInput, }; -use reth_trie_parallel::{proof::ParallelProof, proof_task::ProofTaskManagerHandle}; +use reth_trie_parallel::{ + proof::ParallelProof, + proof_task::{AccountMultiproofInput, ProofTaskKind, ProofTaskManagerHandle}, + root::ParallelStateRootError, +}; use std::{ collections::{BTreeMap, VecDeque}, ops::DerefMut, @@ -62,9 +65,7 @@ impl SparseTrieUpdate { /// Common configuration for multi proof tasks #[derive(Debug, Clone)] -pub(super) struct MultiProofConfig { - /// View over the state in the database. - pub consistent_view: ConsistentDbView, +pub(super) struct MultiProofConfig { /// The sorted collection of cached in-memory intermediate trie nodes that /// can be reused for computation. pub nodes_sorted: Arc, @@ -76,17 +77,13 @@ pub(super) struct MultiProofConfig { pub prefix_sets: Arc, } -impl MultiProofConfig { - /// Creates a new state root config from the consistent view and the trie input. +impl MultiProofConfig { + /// Creates a new state root config from the trie input. /// /// This returns a cleared [`TrieInput`] so that we can reuse any allocated space in the /// [`TrieInput`]. - pub(super) fn new_from_input( - consistent_view: ConsistentDbView, - mut input: TrieInput, - ) -> (TrieInput, Self) { + pub(super) fn from_input(mut input: TrieInput) -> (TrieInput, Self) { let config = Self { - consistent_view, nodes_sorted: Arc::new(input.nodes.drain_into_sorted()), state_sorted: Arc::new(input.state.drain_into_sorted()), prefix_sets: Arc::new(input.prefix_sets.clone()), @@ -245,14 +242,14 @@ pub(crate) fn evm_state_to_hashed_post_state(update: EvmState) -> HashedPostStat /// A pending multiproof task, either [`StorageMultiproofInput`] or [`MultiproofInput`]. #[derive(Debug)] -enum PendingMultiproofTask { +enum PendingMultiproofTask { /// A storage multiproof task input. - Storage(StorageMultiproofInput), + Storage(StorageMultiproofInput), /// A regular multiproof task input. - Regular(MultiproofInput), + Regular(MultiproofInput), } -impl PendingMultiproofTask { +impl PendingMultiproofTask { /// Returns the proof sequence number of the task. const fn proof_sequence_number(&self) -> u64 { match self { @@ -278,22 +275,22 @@ impl PendingMultiproofTask { } } -impl From> for PendingMultiproofTask { - fn from(input: StorageMultiproofInput) -> Self { +impl From for PendingMultiproofTask { + fn from(input: StorageMultiproofInput) -> Self { Self::Storage(input) } } -impl From> for PendingMultiproofTask { - fn from(input: MultiproofInput) -> Self { +impl From for PendingMultiproofTask { + fn from(input: MultiproofInput) -> Self { Self::Regular(input) } } /// Input parameters for spawning a dedicated storage multiproof calculation. #[derive(Debug)] -struct StorageMultiproofInput { - config: MultiProofConfig, +struct StorageMultiproofInput { + config: MultiProofConfig, source: Option, hashed_state_update: HashedPostState, hashed_address: B256, @@ -303,7 +300,7 @@ struct StorageMultiproofInput { multi_added_removed_keys: Arc, } -impl StorageMultiproofInput { +impl StorageMultiproofInput { /// Destroys the input and sends a [`MultiProofMessage::EmptyProof`] message to the sender. fn send_empty_proof(self) { let _ = self.state_root_message_sender.send(MultiProofMessage::EmptyProof { @@ -315,8 +312,8 @@ impl StorageMultiproofInput { /// Input parameters for spawning a multiproof calculation. #[derive(Debug)] -struct MultiproofInput { - config: MultiProofConfig, +struct MultiproofInput { + config: MultiProofConfig, source: Option, hashed_state_update: HashedPostState, proof_targets: MultiProofTargets, @@ -325,7 +322,7 @@ struct MultiproofInput { multi_added_removed_keys: Option>, } -impl MultiproofInput { +impl MultiproofInput { /// Destroys the input and sends a [`MultiProofMessage::EmptyProof`] message to the sender. fn send_empty_proof(self) { let _ = self.state_root_message_sender.send(MultiProofMessage::EmptyProof { @@ -340,17 +337,20 @@ impl MultiproofInput { /// concurrency, further calculation requests are queued and spawn later, after /// availability has been signaled. #[derive(Debug)] -pub struct MultiproofManager { +pub struct MultiproofManager { /// Maximum number of concurrent calculations. max_concurrent: usize, /// Currently running calculations. inflight: usize, /// Queued calculations. - pending: VecDeque>, + pending: VecDeque, /// Executor for tasks executor: WorkloadExecutor, - /// Sender to the storage proof task. - storage_proof_task_handle: ProofTaskManagerHandle>, + /// Handle to the proof task manager used for creating `ParallelProof` instances for storage + /// proofs. + storage_proof_task_handle: ProofTaskManagerHandle, + /// Handle to the proof task manager used for account multiproofs. + account_proof_task_handle: ProofTaskManagerHandle, /// Cached storage proof roots for missed leaves; this maps /// hashed (missed) addresses to their storage proof roots. /// @@ -367,15 +367,13 @@ pub struct MultiproofManager { metrics: MultiProofTaskMetrics, } -impl MultiproofManager -where - Factory: DatabaseProviderFactory + Clone + 'static, -{ +impl MultiproofManager { /// Creates a new [`MultiproofManager`]. fn new( executor: WorkloadExecutor, metrics: MultiProofTaskMetrics, - storage_proof_task_handle: ProofTaskManagerHandle>, + storage_proof_task_handle: ProofTaskManagerHandle, + account_proof_task_handle: ProofTaskManagerHandle, max_concurrent: usize, ) -> Self { Self { @@ -385,6 +383,7 @@ where inflight: 0, metrics, storage_proof_task_handle, + account_proof_task_handle, missed_leaves_storage_roots: Default::default(), } } @@ -395,7 +394,7 @@ where /// Spawns a new multiproof calculation or enqueues it for later if /// `max_concurrent` are already inflight. - fn spawn_or_queue(&mut self, input: PendingMultiproofTask) { + fn spawn_or_queue(&mut self, input: PendingMultiproofTask) { // If there are no proof targets, we can just send an empty multiproof back immediately if input.proof_targets_is_empty() { debug!( @@ -429,7 +428,7 @@ where /// Spawns a multiproof task, dispatching to `spawn_storage_proof` if the input is a storage /// multiproof, and dispatching to `spawn_multiproof` otherwise. - fn spawn_multiproof_task(&mut self, input: PendingMultiproofTask) { + fn spawn_multiproof_task(&mut self, input: PendingMultiproofTask) { match input { PendingMultiproofTask::Storage(storage_input) => { self.spawn_storage_proof(storage_input); @@ -441,7 +440,7 @@ where } /// Spawns a single storage proof calculation task. - fn spawn_storage_proof(&mut self, storage_multiproof_input: StorageMultiproofInput) { + fn spawn_storage_proof(&mut self, storage_multiproof_input: StorageMultiproofInput) { let StorageMultiproofInput { config, source, @@ -468,12 +467,11 @@ where ); let start = Instant::now(); let proof_result = ParallelProof::new( - config.consistent_view, config.nodes_sorted, config.state_sorted, config.prefix_sets, missed_leaves_storage_roots, - storage_proof_task_handle.clone(), + storage_proof_task_handle, ) .with_branch_node_masks(true) .with_multi_added_removed_keys(Some(multi_added_removed_keys)) @@ -516,7 +514,7 @@ where } /// Spawns a single multiproof calculation task. - fn spawn_multiproof(&mut self, multiproof_input: MultiproofInput) { + fn spawn_multiproof(&mut self, multiproof_input: MultiproofInput) { let MultiproofInput { config, source, @@ -526,7 +524,7 @@ where state_root_message_sender, multi_added_removed_keys, } = multiproof_input; - let storage_proof_task_handle = self.storage_proof_task_handle.clone(); + let account_proof_task_handle = self.account_proof_task_handle.clone(); let missed_leaves_storage_roots = self.missed_leaves_storage_roots.clone(); self.executor.spawn_blocking(move || { @@ -544,17 +542,37 @@ where ); let start = Instant::now(); - let proof_result = ParallelProof::new( - config.consistent_view, - config.nodes_sorted, - config.state_sorted, - config.prefix_sets, + + // Extend prefix sets with targets + let frozen_prefix_sets = + ParallelProof::extend_prefix_sets_with_targets(&config.prefix_sets, &proof_targets); + + // Queue account multiproof to worker pool + let input = AccountMultiproofInput { + targets: proof_targets, + prefix_sets: frozen_prefix_sets, + collect_branch_node_masks: true, + multi_added_removed_keys, missed_leaves_storage_roots, - storage_proof_task_handle.clone(), - ) - .with_branch_node_masks(true) - .with_multi_added_removed_keys(multi_added_removed_keys) - .decoded_multiproof(proof_targets); + }; + + let (sender, receiver) = channel(); + let proof_result: Result = (|| { + account_proof_task_handle + .queue_task(ProofTaskKind::AccountMultiproof(input, sender)) + .map_err(|_| { + ParallelStateRootError::Other( + "Failed to queue account multiproof to worker pool".into(), + ) + })?; + + receiver + .recv() + .map_err(|_| { + ParallelStateRootError::Other("Account multiproof channel closed".into()) + })? + .map(|(proof, _stats)| proof) + })(); let elapsed = start.elapsed(); trace!( target: "engine::root", @@ -645,13 +663,13 @@ pub(crate) struct MultiProofTaskMetrics { /// Then it updates relevant leaves according to the result of the transaction. /// This feeds updates to the sparse trie task. #[derive(Debug)] -pub(super) struct MultiProofTask { +pub(super) struct MultiProofTask { /// The size of proof targets chunk to spawn in one calculation. /// /// If [`None`], then chunking is disabled. chunk_size: Option, /// Task configuration. - config: MultiProofConfig, + config: MultiProofConfig, /// Receiver for state root related messages. rx: Receiver, /// Sender for state root related messages. @@ -665,20 +683,17 @@ pub(super) struct MultiProofTask { /// Proof sequencing handler. proof_sequencer: ProofSequencer, /// Manages calculation of multiproofs. - multiproof_manager: MultiproofManager, + multiproof_manager: MultiproofManager, /// multi proof task metrics metrics: MultiProofTaskMetrics, } -impl MultiProofTask -where - Factory: DatabaseProviderFactory + Clone + 'static, -{ +impl MultiProofTask { /// Creates a new multi proof task with the unified message channel pub(super) fn new( - config: MultiProofConfig, + config: MultiProofConfig, executor: WorkloadExecutor, - proof_task_handle: ProofTaskManagerHandle>, + proof_task_handle: ProofTaskManagerHandle, to_sparse_trie: Sender, max_concurrency: usize, chunk_size: Option, @@ -698,7 +713,8 @@ where multiproof_manager: MultiproofManager::new( executor, metrics.clone(), - proof_task_handle, + proof_task_handle.clone(), // handle for storage proof workers + proof_task_handle, // handle for account proof workers max_concurrency, ), metrics, @@ -1202,43 +1218,29 @@ fn get_proof_targets( mod tests { use super::*; use alloy_primitives::map::B256Set; - use reth_provider::{providers::ConsistentDbView, test_utils::create_test_provider_factory}; + use reth_provider::{ + providers::ConsistentDbView, test_utils::create_test_provider_factory, BlockReader, + DatabaseProviderFactory, + }; use reth_trie::{MultiProof, TrieInput}; use reth_trie_parallel::proof_task::{ProofTaskCtx, ProofTaskManager}; use revm_primitives::{B256, U256}; - use std::sync::Arc; - - fn create_state_root_config(factory: F, input: TrieInput) -> MultiProofConfig - where - F: DatabaseProviderFactory + Clone + 'static, - { - let consistent_view = ConsistentDbView::new(factory, None); - let nodes_sorted = Arc::new(input.nodes.clone().into_sorted()); - let state_sorted = Arc::new(input.state.clone().into_sorted()); - let prefix_sets = Arc::new(input.prefix_sets); - MultiProofConfig { consistent_view, nodes_sorted, state_sorted, prefix_sets } - } - - fn create_test_state_root_task(factory: F) -> MultiProofTask + fn create_test_state_root_task(factory: F) -> MultiProofTask where F: DatabaseProviderFactory + Clone + 'static, { let executor = WorkloadExecutor::default(); - let config = create_state_root_config(factory, TrieInput::default()); + let (_trie_input, config) = MultiProofConfig::from_input(TrieInput::default()); let task_ctx = ProofTaskCtx::new( config.nodes_sorted.clone(), config.state_sorted.clone(), config.prefix_sets.clone(), ); - let proof_task = ProofTaskManager::new( - executor.handle().clone(), - config.consistent_view.clone(), - task_ctx, - 1, - 1, - ) - .expect("Failed to create ProofTaskManager"); + let consistent_view = ConsistentDbView::new(factory, None); + let proof_task = + ProofTaskManager::new(executor.handle().clone(), consistent_view, task_ctx, 1, 1) + .expect("Failed to create ProofTaskManager"); let channel = channel(); MultiProofTask::new(config, executor, proof_task.handle(), channel.0, 1, None) diff --git a/crates/node/core/src/args/engine.rs b/crates/node/core/src/args/engine.rs index 2298b28f9ce..6b678b5789b 100644 --- a/crates/node/core/src/args/engine.rs +++ b/crates/node/core/src/args/engine.rs @@ -113,6 +113,11 @@ pub struct EngineArgs { /// If not specified, defaults to 2x available parallelism, clamped between 2 and 64. #[arg(long = "engine.storage-worker-count")] pub storage_worker_count: Option, + + /// Configure the number of account proof workers in the Tokio blocking pool. + /// If not specified, defaults to the same count as storage workers. + #[arg(long = "engine.account-worker-count")] + pub account_worker_count: Option, } #[allow(deprecated)] @@ -140,6 +145,7 @@ impl Default for EngineArgs { always_process_payload_attributes_on_canonical_head: false, allow_unwind_canonical_header: false, storage_worker_count: None, + account_worker_count: None, } } } @@ -171,6 +177,10 @@ impl EngineArgs { config = config.with_storage_worker_count(count); } + if let Some(count) = self.account_worker_count { + config = config.with_account_worker_count(count); + } + config } } diff --git a/crates/trie/parallel/src/proof.rs b/crates/trie/parallel/src/proof.rs index 4a2738fd38e..7fc1f022a7e 100644 --- a/crates/trie/parallel/src/proof.rs +++ b/crates/trie/parallel/src/proof.rs @@ -1,40 +1,25 @@ use crate::{ metrics::ParallelTrieMetrics, - proof_task::{ProofTaskKind, ProofTaskManagerHandle, StorageProofInput}, + proof_task::{ + AccountMultiproofInput, ProofTaskKind, ProofTaskManagerHandle, StorageProofInput, + }, root::ParallelStateRootError, - stats::ParallelTrieTracker, StorageRootTargets, }; -use alloy_primitives::{ - map::{B256Map, B256Set, HashMap}, - B256, -}; -use alloy_rlp::{BufMut, Encodable}; +use alloy_primitives::{map::B256Set, B256}; use dashmap::DashMap; -use itertools::Itertools; use reth_execution_errors::StorageRootError; -use reth_provider::{ - providers::ConsistentDbView, BlockReader, DBProvider, DatabaseProviderFactory, FactoryTx, - ProviderError, -}; use reth_storage_errors::db::DatabaseError; use reth_trie::{ - hashed_cursor::{HashedCursorFactory, HashedPostStateCursorFactory}, - node_iter::{TrieElement, TrieNodeIter}, - prefix_set::{PrefixSet, PrefixSetMut, TriePrefixSetsMut}, - proof::StorageProof, - trie_cursor::{InMemoryTrieCursorFactory, TrieCursorFactory}, + prefix_set::{PrefixSet, PrefixSetMut, TriePrefixSets, TriePrefixSetsMut}, updates::TrieUpdatesSorted, - walker::TrieWalker, - DecodedMultiProof, DecodedStorageMultiProof, HashBuilder, HashedPostStateSorted, - MultiProofTargets, Nibbles, TRIE_ACCOUNT_RLP_MAX_SIZE, + DecodedMultiProof, DecodedStorageMultiProof, HashedPostStateSorted, MultiProofTargets, Nibbles, }; -use reth_trie_common::{ - added_removed_keys::MultiAddedRemovedKeys, - proof::{DecodedProofNodes, ProofRetainer}, +use reth_trie_common::added_removed_keys::MultiAddedRemovedKeys; +use std::sync::{ + mpsc::{channel, Receiver}, + Arc, }; -use reth_trie_db::{DatabaseHashedCursorFactory, DatabaseTrieCursorFactory}; -use std::sync::{mpsc::Receiver, Arc}; use tracing::trace; /// Parallel proof calculator. @@ -42,9 +27,7 @@ use tracing::trace; /// This can collect proof for many targets in parallel, spawning a task for each hashed address /// that has proof targets. #[derive(Debug)] -pub struct ParallelProof { - /// Consistent view of the database. - view: ConsistentDbView, +pub struct ParallelProof { /// The sorted collection of cached in-memory intermediate trie nodes that /// can be reused for computation. pub nodes_sorted: Arc, @@ -58,8 +41,8 @@ pub struct ParallelProof { collect_branch_node_masks: bool, /// Provided by the user to give the necessary context to retain extra proofs. multi_added_removed_keys: Option>, - /// Handle to the storage proof task. - storage_proof_task_handle: ProofTaskManagerHandle>, + /// Handle to the proof task manager. + proof_task_handle: ProofTaskManagerHandle, /// Cached storage proof roots for missed leaves; this maps /// hashed (missed) addresses to their storage proof roots. missed_leaves_storage_roots: Arc>, @@ -67,25 +50,23 @@ pub struct ParallelProof { metrics: ParallelTrieMetrics, } -impl ParallelProof { +impl ParallelProof { /// Create new state proof generator. pub fn new( - view: ConsistentDbView, nodes_sorted: Arc, state_sorted: Arc, prefix_sets: Arc, missed_leaves_storage_roots: Arc>, - storage_proof_task_handle: ProofTaskManagerHandle>, + proof_task_handle: ProofTaskManagerHandle, ) -> Self { Self { - view, nodes_sorted, state_sorted, prefix_sets, missed_leaves_storage_roots, collect_branch_node_masks: false, multi_added_removed_keys: None, - storage_proof_task_handle, + proof_task_handle, #[cfg(feature = "metrics")] metrics: ParallelTrieMetrics::new_with_labels(&[("type", "proof")]), } @@ -106,12 +87,6 @@ impl ParallelProof { self.multi_added_removed_keys = multi_added_removed_keys; self } -} - -impl ParallelProof -where - Factory: DatabaseProviderFactory + Clone + 'static, -{ /// Queues a storage proof task and returns a receiver for the result. fn queue_storage_proof( &self, @@ -128,8 +103,7 @@ where ); let (sender, receiver) = std::sync::mpsc::channel(); - let _ = - self.storage_proof_task_handle.queue_task(ProofTaskKind::StorageProof(input, sender)); + let _ = self.proof_task_handle.queue_task(ProofTaskKind::StorageProof(input, sender)); receiver } @@ -167,16 +141,16 @@ where proof_result } - /// Generate a state multiproof according to specified targets. - pub fn decoded_multiproof( - self, - targets: MultiProofTargets, - ) -> Result { - let mut tracker = ParallelTrieTracker::default(); - - // Extend prefix sets with targets - let mut prefix_sets = (*self.prefix_sets).clone(); - prefix_sets.extend(TriePrefixSetsMut { + /// Extends prefix sets with the given multiproof targets and returns the frozen result. + /// + /// This is a helper function used to prepare prefix sets before computing multiproofs. + /// Returns frozen (immutable) prefix sets ready for use in proof computation. + pub fn extend_prefix_sets_with_targets( + base_prefix_sets: &TriePrefixSetsMut, + targets: &MultiProofTargets, + ) -> TriePrefixSets { + let mut extended = base_prefix_sets.clone(); + extended.extend(TriePrefixSetsMut { account_prefix_set: PrefixSetMut::from(targets.keys().copied().map(Nibbles::unpack)), storage_prefix_sets: targets .iter() @@ -187,13 +161,21 @@ where .collect(), destroyed_accounts: Default::default(), }); - let prefix_sets = prefix_sets.freeze(); + extended.freeze() + } + + /// Generate a state multiproof according to specified targets. + pub fn decoded_multiproof( + self, + targets: MultiProofTargets, + ) -> Result { + // Extend prefix sets with targets + let prefix_sets = Self::extend_prefix_sets_with_targets(&self.prefix_sets, &targets); - let storage_root_targets = StorageRootTargets::new( - prefix_sets.account_prefix_set.iter().map(|nibbles| B256::from_slice(&nibbles.pack())), - prefix_sets.storage_prefix_sets.clone(), + let storage_root_targets_len = StorageRootTargets::count( + &prefix_sets.account_prefix_set, + &prefix_sets.storage_prefix_sets, ); - let storage_root_targets_len = storage_root_targets.len(); trace!( target: "trie::parallel_proof", @@ -201,150 +183,36 @@ where "Starting parallel proof generation" ); - // Pre-calculate storage roots for accounts which were changed. - tracker.set_precomputed_storage_roots(storage_root_targets_len as u64); - - // stores the receiver for the storage proof outcome for the hashed addresses - // this way we can lazily await the outcome when we iterate over the map - let mut storage_proof_receivers = - B256Map::with_capacity_and_hasher(storage_root_targets.len(), Default::default()); - - for (hashed_address, prefix_set) in - storage_root_targets.into_iter().sorted_unstable_by_key(|(address, _)| *address) - { - let target_slots = targets.get(&hashed_address).cloned().unwrap_or_default(); - let receiver = self.queue_storage_proof(hashed_address, prefix_set, target_slots); - - // store the receiver for that result with the hashed address so we can await this in - // place when we iterate over the trie - storage_proof_receivers.insert(hashed_address, receiver); - } + // Queue account multiproof request to account worker pool - let provider_ro = self.view.provider_ro()?; - let trie_cursor_factory = InMemoryTrieCursorFactory::new( - DatabaseTrieCursorFactory::new(provider_ro.tx_ref()), - &self.nodes_sorted, - ); - let hashed_cursor_factory = HashedPostStateCursorFactory::new( - DatabaseHashedCursorFactory::new(provider_ro.tx_ref()), - &self.state_sorted, - ); + let input = AccountMultiproofInput { + targets, + prefix_sets, + collect_branch_node_masks: self.collect_branch_node_masks, + multi_added_removed_keys: self.multi_added_removed_keys.clone(), + missed_leaves_storage_roots: self.missed_leaves_storage_roots.clone(), + }; - let accounts_added_removed_keys = - self.multi_added_removed_keys.as_ref().map(|keys| keys.get_accounts()); + let (sender, receiver) = channel(); + self.proof_task_handle + .queue_task(ProofTaskKind::AccountMultiproof(input, sender)) + .map_err(|_| { + ParallelStateRootError::Other( + "Failed to queue account multiproof: account worker pool unavailable" + .to_string(), + ) + })?; - // Create the walker. - let walker = TrieWalker::<_>::state_trie( - trie_cursor_factory.account_trie_cursor().map_err(ProviderError::Database)?, - prefix_sets.account_prefix_set, - ) - .with_added_removed_keys(accounts_added_removed_keys) - .with_deletions_retained(true); - - // Create a hash builder to rebuild the root node since it is not available in the database. - let retainer = targets - .keys() - .map(Nibbles::unpack) - .collect::() - .with_added_removed_keys(accounts_added_removed_keys); - let mut hash_builder = HashBuilder::default() - .with_proof_retainer(retainer) - .with_updates(self.collect_branch_node_masks); - - // Initialize all storage multiproofs as empty. - // Storage multiproofs for non empty tries will be overwritten if necessary. - let mut collected_decoded_storages: B256Map = - targets.keys().map(|key| (*key, DecodedStorageMultiProof::empty())).collect(); - let mut account_rlp = Vec::with_capacity(TRIE_ACCOUNT_RLP_MAX_SIZE); - let mut account_node_iter = TrieNodeIter::state_trie( - walker, - hashed_cursor_factory.hashed_account_cursor().map_err(ProviderError::Database)?, - ); - while let Some(account_node) = - account_node_iter.try_next().map_err(ProviderError::Database)? - { - match account_node { - TrieElement::Branch(node) => { - hash_builder.add_branch(node.key, node.value, node.children_are_in_trie); - } - TrieElement::Leaf(hashed_address, account) => { - let root = match storage_proof_receivers.remove(&hashed_address) { - Some(rx) => { - let decoded_storage_multiproof = rx.recv().map_err(|e| { - ParallelStateRootError::StorageRoot(StorageRootError::Database( - DatabaseError::Other(format!( - "channel closed for {hashed_address}: {e}" - )), - )) - })??; - let root = decoded_storage_multiproof.root; - collected_decoded_storages - .insert(hashed_address, decoded_storage_multiproof); - root - } - // Since we do not store all intermediate nodes in the database, there might - // be a possibility of re-adding a non-modified leaf to the hash builder. - None => { - tracker.inc_missed_leaves(); - - match self.missed_leaves_storage_roots.entry(hashed_address) { - dashmap::Entry::Occupied(occ) => *occ.get(), - dashmap::Entry::Vacant(vac) => { - let root = StorageProof::new_hashed( - trie_cursor_factory.clone(), - hashed_cursor_factory.clone(), - hashed_address, - ) - .with_prefix_set_mut(Default::default()) - .storage_multiproof( - targets.get(&hashed_address).cloned().unwrap_or_default(), - ) - .map_err(|e| { - ParallelStateRootError::StorageRoot( - StorageRootError::Database(DatabaseError::Other( - e.to_string(), - )), - ) - })? - .root; - vac.insert(root); - root - } - } - } - }; - - // Encode account - account_rlp.clear(); - let account = account.into_trie_account(root); - account.encode(&mut account_rlp as &mut dyn BufMut); - - hash_builder.add_leaf(Nibbles::unpack(hashed_address), &account_rlp); - } - } - } - let _ = hash_builder.root(); + // Wait for account multiproof result from worker + let (multiproof, stats) = receiver.recv().map_err(|_| { + ParallelStateRootError::Other( + "Account multiproof channel dropped: worker died or pool shutdown".to_string(), + ) + })??; - let stats = tracker.finish(); #[cfg(feature = "metrics")] self.metrics.record(stats); - let account_subtree_raw_nodes = hash_builder.take_proof_nodes(); - let decoded_account_subtree = DecodedProofNodes::try_from(account_subtree_raw_nodes)?; - - let (branch_node_hash_masks, branch_node_tree_masks) = if self.collect_branch_node_masks { - let updated_branch_nodes = hash_builder.updated_branch_nodes.unwrap_or_default(); - ( - updated_branch_nodes.iter().map(|(path, node)| (*path, node.hash_mask)).collect(), - updated_branch_nodes - .into_iter() - .map(|(path, node)| (path, node.tree_mask)) - .collect(), - ) - } else { - (HashMap::default(), HashMap::default()) - }; - trace!( target: "trie::parallel_proof", total_targets = storage_root_targets_len, @@ -356,12 +224,7 @@ where "Calculated decoded proof" ); - Ok(DecodedMultiProof { - account_subtree: decoded_account_subtree, - branch_node_hash_masks, - branch_node_tree_masks, - storages: collected_decoded_storages, - }) + Ok(multiproof) } } @@ -371,13 +234,16 @@ mod tests { use crate::proof_task::{ProofTaskCtx, ProofTaskManager}; use alloy_primitives::{ keccak256, - map::{B256Set, DefaultHashBuilder}, + map::{B256Set, DefaultHashBuilder, HashMap}, Address, U256, }; use rand::Rng; use reth_primitives_traits::{Account, StorageEntry}; - use reth_provider::{test_utils::create_test_provider_factory, HashingWriter}; + use reth_provider::{ + providers::ConsistentDbView, test_utils::create_test_provider_factory, HashingWriter, + }; use reth_trie::proof::Proof; + use reth_trie_db::{DatabaseHashedCursorFactory, DatabaseTrieCursorFactory}; use tokio::runtime::Runtime; #[test] @@ -448,8 +314,7 @@ mod tests { let task_ctx = ProofTaskCtx::new(Default::default(), Default::default(), Default::default()); let proof_task = - ProofTaskManager::new(rt.handle().clone(), consistent_view.clone(), task_ctx, 1, 1) - .unwrap(); + ProofTaskManager::new(rt.handle().clone(), consistent_view, task_ctx, 1, 1).unwrap(); let proof_task_handle = proof_task.handle(); // keep the join handle around to make sure it does not return any errors @@ -457,7 +322,6 @@ mod tests { let join_handle = rt.spawn_blocking(move || proof_task.run()); let parallel_result = ParallelProof::new( - consistent_view, Default::default(), Default::default(), Default::default(), diff --git a/crates/trie/parallel/src/proof_task.rs b/crates/trie/parallel/src/proof_task.rs index 0c513c55763..780839c238a 100644 --- a/crates/trie/parallel/src/proof_task.rs +++ b/crates/trie/parallel/src/proof_task.rs @@ -8,34 +8,47 @@ //! Individual [`ProofTaskTx`] instances manage a dedicated [`InMemoryTrieCursorFactory`] and //! [`HashedPostStateCursorFactory`], which are each backed by a database transaction. -use crate::root::ParallelStateRootError; -use alloy_primitives::{map::B256Set, B256}; +use crate::{ + root::ParallelStateRootError, + stats::{ParallelTrieStats, ParallelTrieTracker}, + StorageRootTargets, +}; +use alloy_primitives::{ + map::{B256Map, B256Set}, + B256, +}; +use alloy_rlp::{BufMut, Encodable}; use crossbeam_channel::{unbounded, Receiver as CrossbeamReceiver, Sender as CrossbeamSender}; +use dashmap::DashMap; use reth_db_api::transaction::DbTx; -use reth_execution_errors::{SparseTrieError, SparseTrieErrorKind}; +use reth_execution_errors::SparseTrieError; use reth_provider::{ - providers::ConsistentDbView, BlockReader, DBProvider, DatabaseProviderFactory, FactoryTx, + providers::ConsistentDbView, BlockReader, DBProvider, DatabaseProviderFactory, ProviderError, ProviderResult, }; +use reth_storage_errors::db::DatabaseError; use reth_trie::{ hashed_cursor::{HashedCursorFactory, HashedPostStateCursorFactory}, - prefix_set::TriePrefixSetsMut, + node_iter::{TrieElement, TrieNodeIter}, + prefix_set::{TriePrefixSets, TriePrefixSetsMut}, proof::{ProofTrieNodeProviderFactory, StorageProof}, trie_cursor::{InMemoryTrieCursorFactory, TrieCursorFactory}, updates::TrieUpdatesSorted, - DecodedStorageMultiProof, HashedPostStateSorted, Nibbles, + walker::TrieWalker, + DecodedMultiProof, DecodedStorageMultiProof, HashBuilder, HashedPostStateSorted, + MultiProofTargets, Nibbles, TRIE_ACCOUNT_RLP_MAX_SIZE, }; use reth_trie_common::{ added_removed_keys::MultiAddedRemovedKeys, prefix_set::{PrefixSet, PrefixSetMut}, + proof::{DecodedProofNodes, ProofRetainer}, }; use reth_trie_db::{DatabaseHashedCursorFactory, DatabaseTrieCursorFactory}; use reth_trie_sparse::provider::{RevealedNode, TrieNodeProvider, TrieNodeProviderFactory}; use std::{ - collections::VecDeque, sync::{ atomic::{AtomicUsize, Ordering}, - mpsc::{channel, Receiver, SendError, Sender}, + mpsc::{channel, Receiver, Sender}, Arc, }, time::Instant, @@ -48,6 +61,8 @@ use crate::proof_task_metrics::ProofTaskMetrics; type StorageProofResult = Result; type TrieNodeProviderResult = Result, SparseTrieError>; +type AccountMultiproofResult = + Result<(DecodedMultiProof, ParallelTrieStats), ParallelStateRootError>; /// Internal message for storage workers. /// @@ -73,42 +88,20 @@ enum StorageWorkerJob { }, } -impl StorageWorkerJob { - /// Sends an error back to the caller when worker pool is unavailable. - /// - /// Returns `Ok(())` if the error was sent successfully, or `Err(())` if the receiver was - /// dropped. - fn send_worker_unavailable_error(&self) -> Result<(), ()> { - let error = - ParallelStateRootError::Other("Storage proof worker pool unavailable".to_string()); - - match self { - Self::StorageProof { result_sender, .. } => { - result_sender.send(Err(error)).map_err(|_| ()) - } - Self::BlindedStorageNode { result_sender, .. } => result_sender - .send(Err(SparseTrieError::from(SparseTrieErrorKind::Other(Box::new(error))))) - .map_err(|_| ()), - } - } -} - /// Manager for coordinating proof request execution across different task types. /// /// # Architecture /// -/// This manager handles two distinct execution paths: +/// This manager operates two distinct worker pools for parallel trie operations: /// -/// 1. **Storage Worker Pool** (for storage trie operations): +/// **Worker Pools**: /// - Pre-spawned workers with dedicated long-lived transactions -/// - Handles `StorageProof` and `BlindedStorageNode` requests -/// - Tasks queued via crossbeam unbounded channel +/// - **Storage pool**: Handles `StorageProof` and `BlindedStorageNode` requests +/// - **Account pool**: Handles `AccountMultiproof` and `BlindedAccountNode` requests, delegates +/// storage proof computation to storage pool +/// - Tasks queued via crossbeam unbounded channels /// - Workers continuously process without transaction overhead -/// - Unbounded queue ensures all storage proofs benefit from transaction reuse -/// -/// 2. **On-Demand Execution** (for account trie operations): -/// - Lazy transaction creation for `BlindedAccountNode` requests -/// - Transactions returned to pool after use for reuse +/// - Returns error if worker pool is unavailable (all workers panicked) /// /// # Public Interface /// @@ -117,7 +110,7 @@ impl StorageWorkerJob { /// - Use standard `std::mpsc` message passing /// - Receive consistent return types and error handling #[derive(Debug)] -pub struct ProofTaskManager { +pub struct ProofTaskManager { /// Sender for storage worker jobs to worker pool. storage_work_tx: CrossbeamSender, @@ -126,33 +119,17 @@ pub struct ProofTaskManager { /// May be less than requested if concurrency limits reduce the worker budget. storage_worker_count: usize, - /// Max number of database transactions to create for on-demand account trie operations. - max_concurrency: usize, - - /// Number of database transactions created for on-demand operations. - total_transactions: usize, - - /// Proof tasks pending execution (account trie operations only). - pending_tasks: VecDeque, - - /// The proof task transactions, containing owned cursor factories that are reused for proof - /// calculation (account trie operations only). - proof_task_txs: Vec>>, + /// Sender for account worker jobs to worker pool. + account_work_tx: CrossbeamSender, - /// Consistent view provider used for creating transactions on-demand. - view: ConsistentDbView, - - /// Proof task context shared across all proof tasks. - task_ctx: ProofTaskCtx, - - /// The underlying handle from which to spawn proof tasks. - executor: Handle, + /// Number of account workers successfully spawned. + account_worker_count: usize, /// Receives proof task requests from [`ProofTaskManagerHandle`]. - proof_task_rx: Receiver>>, + proof_task_rx: CrossbeamReceiver, - /// Internal channel for on-demand tasks to return transactions after use. - tx_sender: Sender>>, + /// Sender for creating handles that can queue tasks. + proof_task_tx: CrossbeamSender, /// The number of active handles. /// @@ -307,47 +284,490 @@ fn storage_worker_loop( ); } -impl ProofTaskManager +// TODO: Refactor this with storage_worker_loop. ProofTaskManager should be removed in the following +// pr and `MultiproofManager` should be used instead to dispatch jobs directly. +/// Worker loop for account trie operations. +/// +/// # Lifecycle +/// +/// Each worker: +/// 1. Receives `AccountWorkerJob` from crossbeam unbounded channel +/// 2. Computes result using its dedicated long-lived transaction +/// 3. Sends result directly to original caller via `std::mpsc` +/// 4. Repeats until channel closes (graceful shutdown) +/// +/// # Transaction Reuse +/// +/// Reuses the same transaction and cursor factories across multiple operations +/// to avoid transaction creation and cursor factory setup overhead. +/// +/// # Panic Safety +/// +/// If this function panics, the worker thread terminates but other workers +/// continue operating and the system degrades gracefully. +/// +/// # Shutdown +/// +/// Worker shuts down when the crossbeam channel closes (all senders dropped). +fn account_worker_loop( + proof_tx: ProofTaskTx, + work_rx: CrossbeamReceiver, + storage_work_tx: CrossbeamSender, + worker_id: usize, +) where + Tx: DbTx, +{ + tracing::debug!( + target: "trie::proof_task", + worker_id, + "Account worker started" + ); + + // Create factories once at worker startup to avoid recreation overhead. + let (trie_cursor_factory, hashed_cursor_factory) = proof_tx.create_factories(); + + // Create blinded provider factory once for all blinded node requests + let blinded_provider_factory = ProofTrieNodeProviderFactory::new( + trie_cursor_factory.clone(), + hashed_cursor_factory.clone(), + proof_tx.task_ctx.prefix_sets.clone(), + ); + + let mut account_proofs_processed = 0u64; + let mut account_nodes_processed = 0u64; + + while let Ok(job) = work_rx.recv() { + match job { + AccountWorkerJob::AccountMultiproof { mut input, result_sender } => { + trace!( + target: "trie::proof_task", + worker_id, + targets = input.targets.len(), + "Processing account multiproof" + ); + + let proof_start = Instant::now(); + let mut tracker = ParallelTrieTracker::default(); + + let mut storage_prefix_sets = + std::mem::take(&mut input.prefix_sets.storage_prefix_sets); + + let storage_root_targets_len = StorageRootTargets::count( + &input.prefix_sets.account_prefix_set, + &storage_prefix_sets, + ); + tracker.set_precomputed_storage_roots(storage_root_targets_len as u64); + + let storage_proof_receivers = match queue_storage_proofs( + &storage_work_tx, + &input.targets, + &mut storage_prefix_sets, + input.collect_branch_node_masks, + input.multi_added_removed_keys.as_ref(), + ) { + Ok(receivers) => receivers, + Err(error) => { + let _ = result_sender.send(Err(error)); + continue; + } + }; + + // Use the missed leaves cache passed from the multiproof manager + let missed_leaves_storage_roots = &input.missed_leaves_storage_roots; + + let account_prefix_set = std::mem::take(&mut input.prefix_sets.account_prefix_set); + + let ctx = AccountMultiproofParams { + targets: &input.targets, + prefix_set: account_prefix_set, + collect_branch_node_masks: input.collect_branch_node_masks, + multi_added_removed_keys: input.multi_added_removed_keys.as_ref(), + storage_proof_receivers, + missed_leaves_storage_roots, + }; + + let result = build_account_multiproof_with_storage_roots( + trie_cursor_factory.clone(), + hashed_cursor_factory.clone(), + ctx, + &mut tracker, + ); + + let proof_elapsed = proof_start.elapsed(); + let stats = tracker.finish(); + let result = result.map(|proof| (proof, stats)); + account_proofs_processed += 1; + + if result_sender.send(result).is_err() { + tracing::debug!( + target: "trie::proof_task", + worker_id, + account_proofs_processed, + "Account multiproof receiver dropped, discarding result" + ); + } + + trace!( + target: "trie::proof_task", + worker_id, + proof_time_us = proof_elapsed.as_micros(), + total_processed = account_proofs_processed, + "Account multiproof completed" + ); + } + + AccountWorkerJob::BlindedAccountNode { path, result_sender } => { + trace!( + target: "trie::proof_task", + worker_id, + ?path, + "Processing blinded account node" + ); + + let start = Instant::now(); + let result = blinded_provider_factory.account_node_provider().trie_node(&path); + let elapsed = start.elapsed(); + + account_nodes_processed += 1; + + if result_sender.send(result).is_err() { + tracing::debug!( + target: "trie::proof_task", + worker_id, + ?path, + account_nodes_processed, + "Blinded account node receiver dropped, discarding result" + ); + } + + trace!( + target: "trie::proof_task", + worker_id, + ?path, + node_time_us = elapsed.as_micros(), + total_processed = account_nodes_processed, + "Blinded account node completed" + ); + } + } + } + + tracing::debug!( + target: "trie::proof_task", + worker_id, + account_proofs_processed, + account_nodes_processed, + "Account worker shutting down" + ); +} + +/// Builds an account multiproof by consuming storage proof receivers lazily during trie walk. +/// +/// This is a helper function used by account workers to build the account subtree proof +/// while storage proofs are still being computed. Receivers are consumed only when needed, +/// enabling interleaved parallelism between account trie traversal and storage proof computation. +/// +/// Returns a `DecodedMultiProof` containing the account subtree and storage proofs. +fn build_account_multiproof_with_storage_roots( + trie_cursor_factory: C, + hashed_cursor_factory: H, + ctx: AccountMultiproofParams<'_>, + tracker: &mut ParallelTrieTracker, +) -> Result where - Factory: DatabaseProviderFactory, + C: TrieCursorFactory + Clone, + H: HashedCursorFactory + Clone, { - /// Creates a new [`ProofTaskManager`] with pre-spawned storage proof workers. + let accounts_added_removed_keys = + ctx.multi_added_removed_keys.as_ref().map(|keys| keys.get_accounts()); + + // Create the walker. + let walker = TrieWalker::<_>::state_trie( + trie_cursor_factory.account_trie_cursor().map_err(ProviderError::Database)?, + ctx.prefix_set, + ) + .with_added_removed_keys(accounts_added_removed_keys) + .with_deletions_retained(true); + + // Create a hash builder to rebuild the root node since it is not available in the database. + let retainer = ctx + .targets + .keys() + .map(Nibbles::unpack) + .collect::() + .with_added_removed_keys(accounts_added_removed_keys); + let mut hash_builder = HashBuilder::default() + .with_proof_retainer(retainer) + .with_updates(ctx.collect_branch_node_masks); + + // Initialize storage multiproofs map with pre-allocated capacity. + // Proofs will be inserted as they're consumed from receivers during trie walk. + let mut collected_decoded_storages: B256Map = + B256Map::with_capacity_and_hasher(ctx.targets.len(), Default::default()); + let mut account_rlp = Vec::with_capacity(TRIE_ACCOUNT_RLP_MAX_SIZE); + let mut account_node_iter = TrieNodeIter::state_trie( + walker, + hashed_cursor_factory.hashed_account_cursor().map_err(ProviderError::Database)?, + ); + + let mut storage_proof_receivers = ctx.storage_proof_receivers; + + while let Some(account_node) = account_node_iter.try_next().map_err(ProviderError::Database)? { + match account_node { + TrieElement::Branch(node) => { + hash_builder.add_branch(node.key, node.value, node.children_are_in_trie); + } + TrieElement::Leaf(hashed_address, account) => { + let root = match storage_proof_receivers.remove(&hashed_address) { + Some(receiver) => { + // Block on this specific storage proof receiver - enables interleaved + // parallelism + let proof = receiver.recv().map_err(|_| { + ParallelStateRootError::StorageRoot( + reth_execution_errors::StorageRootError::Database( + DatabaseError::Other(format!( + "Storage proof channel closed for {hashed_address}" + )), + ), + ) + })??; + let root = proof.root; + collected_decoded_storages.insert(hashed_address, proof); + root + } + // Since we do not store all intermediate nodes in the database, there might + // be a possibility of re-adding a non-modified leaf to the hash builder. + None => { + tracker.inc_missed_leaves(); + + match ctx.missed_leaves_storage_roots.entry(hashed_address) { + dashmap::Entry::Occupied(occ) => *occ.get(), + dashmap::Entry::Vacant(vac) => { + let root = StorageProof::new_hashed( + trie_cursor_factory.clone(), + hashed_cursor_factory.clone(), + hashed_address, + ) + .with_prefix_set_mut(Default::default()) + .storage_multiproof( + ctx.targets.get(&hashed_address).cloned().unwrap_or_default(), + ) + .map_err(|e| { + ParallelStateRootError::StorageRoot( + reth_execution_errors::StorageRootError::Database( + DatabaseError::Other(e.to_string()), + ), + ) + })? + .root; + + vac.insert(root); + root + } + } + } + }; + + // Encode account + account_rlp.clear(); + let account = account.into_trie_account(root); + account.encode(&mut account_rlp as &mut dyn BufMut); + + hash_builder.add_leaf(Nibbles::unpack(hashed_address), &account_rlp); + } + } + } + + // Consume remaining storage proof receivers for accounts not encountered during trie walk. + for (hashed_address, receiver) in storage_proof_receivers { + if let Ok(Ok(proof)) = receiver.recv() { + collected_decoded_storages.insert(hashed_address, proof); + } + } + + let _ = hash_builder.root(); + + let account_subtree_raw_nodes = hash_builder.take_proof_nodes(); + let decoded_account_subtree = DecodedProofNodes::try_from(account_subtree_raw_nodes)?; + + let (branch_node_hash_masks, branch_node_tree_masks) = if ctx.collect_branch_node_masks { + let updated_branch_nodes = hash_builder.updated_branch_nodes.unwrap_or_default(); + ( + updated_branch_nodes.iter().map(|(path, node)| (*path, node.hash_mask)).collect(), + updated_branch_nodes.into_iter().map(|(path, node)| (path, node.tree_mask)).collect(), + ) + } else { + (Default::default(), Default::default()) + }; + + Ok(DecodedMultiProof { + account_subtree: decoded_account_subtree, + branch_node_hash_masks, + branch_node_tree_masks, + storages: collected_decoded_storages, + }) +} + +/// Queues storage proofs for all accounts in the targets and returns receivers. +/// +/// This function queues all storage proof tasks to the worker pool but returns immediately +/// with receivers, allowing the account trie walk to proceed in parallel with storage proof +/// computation. This enables interleaved parallelism for better performance. +/// +/// Propagates errors up if queuing fails. Receivers must be consumed by the caller. +fn queue_storage_proofs( + storage_work_tx: &CrossbeamSender, + targets: &MultiProofTargets, + storage_prefix_sets: &mut B256Map, + with_branch_node_masks: bool, + multi_added_removed_keys: Option<&Arc>, +) -> Result>, ParallelStateRootError> { + let mut storage_proof_receivers = + B256Map::with_capacity_and_hasher(targets.len(), Default::default()); + + // Queue all storage proofs to worker pool + for (hashed_address, target_slots) in targets.iter() { + let prefix_set = storage_prefix_sets.remove(hashed_address).unwrap_or_default(); + + // Always queue a storage proof so we obtain the storage root even when no slots are + // requested. + let input = StorageProofInput::new( + *hashed_address, + prefix_set, + target_slots.clone(), + with_branch_node_masks, + multi_added_removed_keys.cloned(), + ); + + let (sender, receiver) = channel(); + + // If queuing fails, propagate error up (no fallback) + storage_work_tx + .send(StorageWorkerJob::StorageProof { input, result_sender: sender }) + .map_err(|_| { + ParallelStateRootError::Other(format!( + "Failed to queue storage proof for {}: storage worker pool unavailable", + hashed_address + )) + })?; + + storage_proof_receivers.insert(*hashed_address, receiver); + } + + Ok(storage_proof_receivers) +} + +impl ProofTaskManager { + /// Creates a new [`ProofTaskManager`] with pre-spawned storage and account proof workers. + /// + /// This manager coordinates both storage and account worker pools: + /// - Storage workers handle `StorageProof` and `BlindedStorageNode` requests + /// - Account workers handle `AccountMultiproof` and `BlindedAccountNode` requests /// /// The `storage_worker_count` determines how many storage workers to spawn, and - /// `max_concurrency` determines the limit for on-demand operations (blinded account nodes). - /// These are now independent - storage workers are spawned as requested, and on-demand - /// operations use a separate concurrency pool for blinded account nodes. + /// `account_worker_count` determines how many account workers to spawn. /// Returns an error if the underlying provider fails to create the transactions required for /// spawning workers. - pub fn new( + pub fn new( executor: Handle, view: ConsistentDbView, task_ctx: ProofTaskCtx, - max_concurrency: usize, storage_worker_count: usize, - ) -> ProviderResult { - let (tx_sender, proof_task_rx) = channel(); + account_worker_count: usize, + ) -> ProviderResult + where + Factory: DatabaseProviderFactory, + { + // Use unbounded channel for the router to prevent account workers from blocking + // when queuing storage proofs. Account workers queue many storage proofs through + // this channel, and blocking on a bounded channel wastes parallel worker capacity. + let (proof_task_tx, proof_task_rx) = unbounded(); // Use unbounded channel to ensure all storage operations are queued to workers. // This maintains transaction reuse benefits and avoids fallback to on-demand execution. let (storage_work_tx, storage_work_rx) = unbounded::(); + let (account_work_tx, account_work_rx) = unbounded::(); tracing::info!( target: "trie::proof_task", storage_worker_count, - max_concurrency, - "Initializing storage worker pool with unbounded queue" + account_worker_count, + "Initializing storage and account worker pools with unbounded queues" ); + // Spawn storage workers + let spawned_storage_workers = Self::spawn_storage_workers( + &executor, + &view, + &task_ctx, + storage_worker_count, + storage_work_rx, + )?; + + // Spawn account workers with direct access to the storage worker queue + let spawned_account_workers = Self::spawn_account_workers( + &executor, + &view, + &task_ctx, + account_worker_count, + account_work_rx, + storage_work_tx.clone(), + )?; + + Ok(Self { + storage_work_tx, + storage_worker_count: spawned_storage_workers, + account_work_tx, + account_worker_count: spawned_account_workers, + proof_task_rx, + proof_task_tx, + active_handles: Arc::new(AtomicUsize::new(0)), + + #[cfg(feature = "metrics")] + metrics: ProofTaskMetrics::default(), + }) + } + + /// Returns a handle for sending new proof tasks to the [`ProofTaskManager`]. + pub fn handle(&self) -> ProofTaskManagerHandle { + ProofTaskManagerHandle::new(self.proof_task_tx.clone(), self.active_handles.clone()) + } + + /// Spawns a pool of storage workers with dedicated database transactions. + /// + /// Each worker receives `StorageWorkerJob` from the channel and processes storage proofs + /// and blinded storage node requests using a dedicated long-lived transaction. + /// + /// # Parameters + /// - `executor`: Tokio runtime handle for spawning blocking tasks + /// - `view`: Consistent database view for creating transactions + /// - `task_ctx`: Shared context with trie updates and prefix sets + /// - `worker_count`: Number of storage workers to spawn + /// - `work_rx`: Receiver for storage worker jobs + /// + /// # Returns + /// The number of storage workers successfully spawned + fn spawn_storage_workers( + executor: &Handle, + view: &ConsistentDbView, + task_ctx: &ProofTaskCtx, + worker_count: usize, + work_rx: CrossbeamReceiver, + ) -> ProviderResult + where + Factory: DatabaseProviderFactory, + { let mut spawned_workers = 0; - for worker_id in 0..storage_worker_count { - let provider_ro = view.provider_ro()?; + for worker_id in 0..worker_count { + let provider_ro = view.provider_ro()?; let tx = provider_ro.into_tx(); let proof_task_tx = ProofTaskTx::new(tx, task_ctx.clone(), worker_id); - let work_rx = storage_work_rx.clone(); + let work_rx_clone = work_rx.clone(); - executor.spawn_blocking(move || storage_worker_loop(proof_task_tx, work_rx, worker_id)); + executor.spawn_blocking(move || { + storage_worker_loop(proof_task_tx, work_rx_clone, worker_id) + }); spawned_workers += 1; @@ -359,99 +779,77 @@ where ); } - Ok(Self { - storage_work_tx, - storage_worker_count: spawned_workers, - max_concurrency, - total_transactions: 0, - pending_tasks: VecDeque::new(), - proof_task_txs: Vec::with_capacity(max_concurrency), - view, - task_ctx, - executor, - proof_task_rx, - tx_sender, - active_handles: Arc::new(AtomicUsize::new(0)), - - #[cfg(feature = "metrics")] - metrics: ProofTaskMetrics::default(), - }) + Ok(spawned_workers) } - /// Returns a handle for sending new proof tasks to the [`ProofTaskManager`]. - pub fn handle(&self) -> ProofTaskManagerHandle> { - ProofTaskManagerHandle::new(self.tx_sender.clone(), self.active_handles.clone()) - } -} - -impl ProofTaskManager -where - Factory: DatabaseProviderFactory + 'static, -{ - /// Inserts the task into the pending tasks queue. - pub fn queue_proof_task(&mut self, task: ProofTaskKind) { - self.pending_tasks.push_back(task); - } - - /// Gets either the next available transaction, or creates a new one if all are in use and the - /// total number of transactions created is less than the max concurrency. - pub fn get_or_create_tx(&mut self) -> ProviderResult>>> { - if let Some(proof_task_tx) = self.proof_task_txs.pop() { - return Ok(Some(proof_task_tx)); - } + /// Spawns a pool of account workers with dedicated database transactions. + /// + /// Each worker receives `AccountWorkerJob` from the channel and processes account multiproofs + /// and blinded account node requests using a dedicated long-lived transaction. Account workers + /// can delegate storage proof computation to the storage worker pool. + /// + /// # Parameters + /// - `executor`: Tokio runtime handle for spawning blocking tasks + /// - `view`: Consistent database view for creating transactions + /// - `task_ctx`: Shared context with trie updates and prefix sets + /// - `worker_count`: Number of account workers to spawn + /// - `work_rx`: Receiver for account worker jobs + /// - `storage_work_tx`: Sender to delegate storage proofs to storage worker pool + /// + /// # Returns + /// The number of account workers successfully spawned + fn spawn_account_workers( + executor: &Handle, + view: &ConsistentDbView, + task_ctx: &ProofTaskCtx, + worker_count: usize, + work_rx: CrossbeamReceiver, + storage_work_tx: CrossbeamSender, + ) -> ProviderResult + where + Factory: DatabaseProviderFactory, + { + let mut spawned_workers = 0; - // if we can create a new tx within our concurrency limits, create one on-demand - if self.total_transactions < self.max_concurrency { - let provider_ro = self.view.provider_ro()?; + for worker_id in 0..worker_count { + let provider_ro = view.provider_ro()?; let tx = provider_ro.into_tx(); - self.total_transactions += 1; - return Ok(Some(ProofTaskTx::new(tx, self.task_ctx.clone(), self.total_transactions))); - } + let proof_task_tx = ProofTaskTx::new(tx, task_ctx.clone(), worker_id); + let work_rx_clone = work_rx.clone(); + let storage_work_tx_clone = storage_work_tx.clone(); - Ok(None) - } + executor.spawn_blocking(move || { + account_worker_loop(proof_task_tx, work_rx_clone, storage_work_tx_clone, worker_id) + }); - /// Spawns the next queued proof task on the executor with the given input, if there are any - /// transactions available. - /// - /// This will return an error if a transaction must be created on-demand and the consistent view - /// provider fails. - pub fn try_spawn_next(&mut self) -> ProviderResult<()> { - let Some(task) = self.pending_tasks.pop_front() else { return Ok(()) }; - - let Some(proof_task_tx) = self.get_or_create_tx()? else { - // if there are no txs available, requeue the proof task - self.pending_tasks.push_front(task); - return Ok(()) - }; - - let tx_sender = self.tx_sender.clone(); - self.executor.spawn_blocking(move || match task { - ProofTaskKind::BlindedAccountNode(path, sender) => { - proof_task_tx.blinded_account_node(path, sender, tx_sender); - } - // Storage trie operations should never reach here as they're routed to worker pool - ProofTaskKind::BlindedStorageNode(_, _, _) | ProofTaskKind::StorageProof(_, _) => { - unreachable!("Storage trie operations should be routed to worker pool") - } - }); + spawned_workers += 1; - Ok(()) + tracing::debug!( + target: "trie::proof_task", + worker_id, + spawned_workers, + "Account worker spawned successfully" + ); + } + + Ok(spawned_workers) } - /// Loops, managing the proof tasks, and sending new tasks to the executor. + /// Loops, managing the proof tasks, routing them to the appropriate worker pools. /// /// # Task Routing /// /// - **Storage Trie Operations** (`StorageProof` and `BlindedStorageNode`): Routed to - /// pre-spawned worker pool via unbounded channel. - /// - **Account Trie Operations** (`BlindedAccountNode`): Queued for on-demand execution via - /// `pending_tasks`. + /// pre-spawned storage worker pool via unbounded channel. Returns error if workers are + /// disconnected (e.g., all workers panicked). + /// - **Account Trie Operations** (`AccountMultiproof` and `BlindedAccountNode`): Routed to + /// pre-spawned account worker pool via unbounded channel. Returns error if workers are + /// disconnected. /// /// # Shutdown /// - /// On termination, `storage_work_tx` is dropped, closing the channel and - /// signaling all workers to shut down gracefully. + /// On termination, `storage_work_tx` and `account_work_tx` are dropped, closing the channels + /// and signaling all workers to shut down gracefully. pub fn run(mut self) -> ProviderResult<()> { loop { match self.proof_task_rx.recv() { @@ -459,27 +857,17 @@ where match message { ProofTaskMessage::QueueTask(task) => match task { ProofTaskKind::StorageProof(input, sender) => { - match self.storage_work_tx.send(StorageWorkerJob::StorageProof { - input, - result_sender: sender, - }) { - Ok(_) => { - tracing::trace!( - target: "trie::proof_task", - "Storage proof dispatched to worker pool" - ); - } - Err(crossbeam_channel::SendError(job)) => { - tracing::error!( - target: "trie::proof_task", - storage_worker_count = self.storage_worker_count, - "Worker pool disconnected, cannot process storage proof" - ); - - // Send error back to caller - let _ = job.send_worker_unavailable_error(); - } - } + self.storage_work_tx + .send(StorageWorkerJob::StorageProof { + input, + result_sender: sender, + }) + .expect("failed to dispatch storage proof: storage worker pool unavailable (all workers panicked or pool shut down)"); + + tracing::trace!( + target: "trie::proof_task", + "Storage proof dispatched to worker pool" + ); } ProofTaskKind::BlindedStorageNode(account, path, sender) => { @@ -488,56 +876,65 @@ where self.metrics.storage_nodes += 1; } - match self.storage_work_tx.send( - StorageWorkerJob::BlindedStorageNode { + self.storage_work_tx + .send(StorageWorkerJob::BlindedStorageNode { account, path, result_sender: sender, - }, - ) { - Ok(_) => { - tracing::trace!( - target: "trie::proof_task", - ?account, - ?path, - "Blinded storage node dispatched to worker pool" - ); - } - Err(crossbeam_channel::SendError(job)) => { - tracing::warn!( - target: "trie::proof_task", - storage_worker_count = self.storage_worker_count, - ?account, - ?path, - "Worker pool disconnected, cannot process blinded storage node" - ); - - // Send error back to caller - let _ = job.send_worker_unavailable_error(); - } - } + }) + .expect("failed to dispatch blinded storage node: storage worker pool unavailable (all workers panicked or pool shut down)"); + + tracing::trace!( + target: "trie::proof_task", + ?account, + ?path, + "Blinded storage node dispatched to worker pool" + ); } - ProofTaskKind::BlindedAccountNode(_, _) => { - // Route account trie operations to pending_tasks + ProofTaskKind::BlindedAccountNode(path, sender) => { #[cfg(feature = "metrics")] { self.metrics.account_nodes += 1; } - self.queue_proof_task(task); + + self.account_work_tx + .send(AccountWorkerJob::BlindedAccountNode { + path, + result_sender: sender, + }) + .expect("failed to dispatch blinded account node: account worker pool unavailable (all workers panicked or pool shut down)"); + + tracing::trace!( + target: "trie::proof_task", + ?path, + "Blinded account node dispatched to worker pool" + ); + } + + ProofTaskKind::AccountMultiproof(input, sender) => { + self.account_work_tx + .send(AccountWorkerJob::AccountMultiproof { + input, + result_sender: sender, + }) + .expect("failed to dispatch account multiproof: account worker pool unavailable (all workers panicked or pool shut down)"); + + tracing::trace!( + target: "trie::proof_task", + "Account multiproof dispatched to worker pool" + ); } }, - ProofTaskMessage::Transaction(tx) => { - // Return transaction to pending_tasks pool - self.proof_task_txs.push(tx); - } ProofTaskMessage::Terminate => { - // Drop storage_work_tx to signal workers to shut down + // Drop worker channels to signal workers to shut down drop(self.storage_work_tx); + drop(self.account_work_tx); tracing::debug!( target: "trie::proof_task", storage_worker_count = self.storage_worker_count, + account_worker_count = self.account_worker_count, "Shutting down proof task manager, signaling workers to terminate" ); @@ -553,9 +950,6 @@ where // However this should never happen, as this struct stores a sender Err(_) => return Ok(()), }; - - // Try spawning pending account trie tasks - self.try_spawn_next()?; } } } @@ -672,49 +1066,6 @@ where decoded_result } - - /// Retrieves blinded account node by path. - fn blinded_account_node( - self, - path: Nibbles, - result_sender: Sender, - tx_sender: Sender>, - ) { - trace!( - target: "trie::proof_task", - ?path, - "Starting blinded account node retrieval" - ); - - let (trie_cursor_factory, hashed_cursor_factory) = self.create_factories(); - - let blinded_provider_factory = ProofTrieNodeProviderFactory::new( - trie_cursor_factory, - hashed_cursor_factory, - self.task_ctx.prefix_sets.clone(), - ); - - let start = Instant::now(); - let result = blinded_provider_factory.account_node_provider().trie_node(&path); - trace!( - target: "trie::proof_task", - ?path, - elapsed = ?start.elapsed(), - "Completed blinded account node retrieval" - ); - - if let Err(error) = result_sender.send(result) { - tracing::error!( - target: "trie::proof_task", - ?path, - ?error, - "Failed to send blinded account node result" - ); - } - - // send the tx back - let _ = tx_sender.send(ProofTaskMessage::Transaction(self)); - } } /// This represents an input for a storage proof. @@ -752,6 +1103,59 @@ impl StorageProofInput { } } +/// Input parameters for account multiproof computation. +#[derive(Debug, Clone)] +pub struct AccountMultiproofInput { + /// The targets for which to compute the multiproof. + pub targets: MultiProofTargets, + /// The prefix sets for the proof calculation. + pub prefix_sets: TriePrefixSets, + /// Whether or not to collect branch node masks. + pub collect_branch_node_masks: bool, + /// Provided by the user to give the necessary context to retain extra proofs. + pub multi_added_removed_keys: Option>, + /// Cached storage proof roots for missed leaves encountered during account trie walk. + pub missed_leaves_storage_roots: Arc>, +} + +/// Parameters for building an account multiproof with pre-computed storage roots. +struct AccountMultiproofParams<'a> { + /// The targets for which to compute the multiproof. + targets: &'a MultiProofTargets, + /// The prefix set for the account trie walk. + prefix_set: PrefixSet, + /// Whether or not to collect branch node masks. + collect_branch_node_masks: bool, + /// Provided by the user to give the necessary context to retain extra proofs. + multi_added_removed_keys: Option<&'a Arc>, + /// Receivers for storage proofs being computed in parallel. + storage_proof_receivers: B256Map>, + /// Cached storage proof roots for missed leaves encountered during account trie walk. + missed_leaves_storage_roots: &'a DashMap, +} + +/// Internal message for account workers. +/// +/// This is NOT exposed publicly. External callers use `ProofTaskKind::AccountMultiproof` or +/// `ProofTaskKind::BlindedAccountNode` which are routed through the manager's `std::mpsc` channel. +#[derive(Debug)] +enum AccountWorkerJob { + /// Account multiproof computation request + AccountMultiproof { + /// Account multiproof input parameters + input: AccountMultiproofInput, + /// Channel to send result back to original caller + result_sender: Sender, + }, + /// Blinded account node retrieval request + BlindedAccountNode { + /// Path to the account node + path: Nibbles, + /// Channel to send result back to original caller + result_sender: Sender, + }, +} + /// Data used for initializing cursor factories that is shared across all storage proof instances. #[derive(Debug, Clone)] pub struct ProofTaskCtx { @@ -779,11 +1183,9 @@ impl ProofTaskCtx { /// Message used to communicate with [`ProofTaskManager`]. #[derive(Debug)] -pub enum ProofTaskMessage { +pub enum ProofTaskMessage { /// A request to queue a proof task. QueueTask(ProofTaskKind), - /// A returned database transaction. - Transaction(ProofTaskTx), /// A request to terminate the proof task manager. Terminate, } @@ -800,27 +1202,35 @@ pub enum ProofTaskKind { BlindedAccountNode(Nibbles, Sender), /// A blinded storage node request. BlindedStorageNode(B256, Nibbles, Sender), + /// An account multiproof request. + AccountMultiproof(AccountMultiproofInput, Sender), } /// A handle that wraps a single proof task sender that sends a terminate message on `Drop` if the /// number of active handles went to zero. #[derive(Debug)] -pub struct ProofTaskManagerHandle { +pub struct ProofTaskManagerHandle { /// The sender for the proof task manager. - sender: Sender>, + sender: CrossbeamSender, /// The number of active handles. active_handles: Arc, } -impl ProofTaskManagerHandle { +impl ProofTaskManagerHandle { /// Creates a new [`ProofTaskManagerHandle`] with the given sender. - pub fn new(sender: Sender>, active_handles: Arc) -> Self { + pub fn new( + sender: CrossbeamSender, + active_handles: Arc, + ) -> Self { active_handles.fetch_add(1, Ordering::SeqCst); Self { sender, active_handles } } /// Queues a task to the proof task manager. - pub fn queue_task(&self, task: ProofTaskKind) -> Result<(), SendError>> { + pub fn queue_task( + &self, + task: ProofTaskKind, + ) -> Result<(), crossbeam_channel::SendError> { self.sender.send(ProofTaskMessage::QueueTask(task)) } @@ -830,13 +1240,13 @@ impl ProofTaskManagerHandle { } } -impl Clone for ProofTaskManagerHandle { +impl Clone for ProofTaskManagerHandle { fn clone(&self) -> Self { Self::new(self.sender.clone(), self.active_handles.clone()) } } -impl Drop for ProofTaskManagerHandle { +impl Drop for ProofTaskManagerHandle { fn drop(&mut self) { // Decrement the number of active handles and terminate the manager if it was the last // handle. @@ -846,9 +1256,9 @@ impl Drop for ProofTaskManagerHandle { } } -impl TrieNodeProviderFactory for ProofTaskManagerHandle { - type AccountNodeProvider = ProofTaskTrieNodeProvider; - type StorageNodeProvider = ProofTaskTrieNodeProvider; +impl TrieNodeProviderFactory for ProofTaskManagerHandle { + type AccountNodeProvider = ProofTaskTrieNodeProvider; + type StorageNodeProvider = ProofTaskTrieNodeProvider; fn account_node_provider(&self) -> Self::AccountNodeProvider { ProofTaskTrieNodeProvider::AccountNode { sender: self.sender.clone() } @@ -861,22 +1271,22 @@ impl TrieNodeProviderFactory for ProofTaskManagerHandle { /// Trie node provider for retrieving trie nodes by path. #[derive(Debug)] -pub enum ProofTaskTrieNodeProvider { +pub enum ProofTaskTrieNodeProvider { /// Blinded account trie node provider. AccountNode { /// Sender to the proof task. - sender: Sender>, + sender: CrossbeamSender, }, /// Blinded storage trie node provider. StorageNode { /// Target account. account: B256, /// Sender to the proof task. - sender: Sender>, + sender: CrossbeamSender, }, } -impl TrieNodeProvider for ProofTaskTrieNodeProvider { +impl TrieNodeProvider for ProofTaskTrieNodeProvider { fn trie_node(&self, path: &Nibbles) -> Result, SparseTrieError> { let (tx, rx) = channel(); match self { @@ -919,7 +1329,7 @@ mod tests { ) } - /// Ensures `max_concurrency` is independent of storage workers. + /// Ensures `max_concurrency` is independent of storage and account workers. #[test] fn proof_task_manager_independent_pools() { let runtime = Builder::new_multi_thread().worker_threads(1).enable_all().build().unwrap(); @@ -929,11 +1339,11 @@ mod tests { let view = ConsistentDbView::new(factory, None); let ctx = test_ctx(); - let manager = ProofTaskManager::new(handle.clone(), view, ctx, 1, 5).unwrap(); - // With storage_worker_count=5, we get exactly 5 workers + let manager = ProofTaskManager::new(handle.clone(), view, ctx, 5, 3).unwrap(); + // With storage_worker_count=5, we get exactly 5 storage workers assert_eq!(manager.storage_worker_count, 5); - // max_concurrency=1 is for on-demand operations only - assert_eq!(manager.max_concurrency, 1); + // With account_worker_count=3, we get exactly 3 account workers + assert_eq!(manager.account_worker_count, 3); drop(manager); task::yield_now().await; diff --git a/crates/trie/parallel/src/storage_root_targets.rs b/crates/trie/parallel/src/storage_root_targets.rs index f844b70fca5..0c6d9f43498 100644 --- a/crates/trie/parallel/src/storage_root_targets.rs +++ b/crates/trie/parallel/src/storage_root_targets.rs @@ -24,6 +24,23 @@ impl StorageRootTargets { .collect(), ) } + + /// Returns the total number of unique storage root targets without allocating new maps. + pub fn count( + account_prefix_set: &PrefixSet, + storage_prefix_sets: &B256Map, + ) -> usize { + let mut count = storage_prefix_sets.len(); + + for nibbles in account_prefix_set { + let hashed_address = B256::from_slice(&nibbles.pack()); + if !storage_prefix_sets.contains_key(&hashed_address) { + count += 1; + } + } + + count + } } impl IntoIterator for StorageRootTargets { diff --git a/docs/vocs/docs/pages/cli/reth/node.mdx b/docs/vocs/docs/pages/cli/reth/node.mdx index 394854f7246..1acbcf248fc 100644 --- a/docs/vocs/docs/pages/cli/reth/node.mdx +++ b/docs/vocs/docs/pages/cli/reth/node.mdx @@ -867,6 +867,9 @@ Engine: --engine.storage-worker-count Configure the number of storage proof workers in the Tokio blocking pool. If not specified, defaults to 2x available parallelism, clamped between 2 and 64 + --engine.account-worker-count + Configure the number of account proof workers in the Tokio blocking pool. If not specified, defaults to the same count as storage workers + ERA: --era.enable Enable import from ERA1 files