Skip to content

Commit c4cf477

Browse files
committed
refactor(engine): streamline proof task management with new handle
- Replaced `ProofTaskManager` with a new `new_proof_task_handle` function for improved task management. - Updated related code to utilize the new proof task handle, enhancing clarity and reducing complexity. - Adjusted tests to reflect changes in proof task initialization and handling.
1 parent 0c114c0 commit c4cf477

File tree

3 files changed

+23
-47
lines changed

3 files changed

+23
-47
lines changed

crates/engine/tree/src/tree/payload_processor/mod.rs

Lines changed: 5 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ use reth_provider::{
3232
use reth_revm::{db::BundleState, state::EvmState};
3333
use reth_trie::TrieInput;
3434
use reth_trie_parallel::{
35-
proof_task::{ProofTaskCtx, ProofTaskManager},
35+
proof_task::{new_proof_task_handle, ProofTaskCtx},
3636
root::ParallelStateRootError,
3737
};
3838
use reth_trie_sparse::{
@@ -196,23 +196,21 @@ where
196196
state_root_config.prefix_sets.clone(),
197197
);
198198
let max_proof_task_concurrency = config.max_proof_task_concurrency() as usize;
199-
let storage_worker_count = config.storage_proof_workers();
200-
let proof_task = ProofTaskManager::new(
199+
let proof_task_handle = new_proof_task_handle(
201200
self.executor.handle().clone(),
202201
state_root_config.consistent_view.clone(),
203202
task_ctx,
204203
max_proof_task_concurrency,
205-
storage_worker_count,
206204
)
207-
.expect("Failed to create ProofTaskManager with storage workers");
205+
.expect("Failed to create proof task handle");
208206

209207
// We set it to half of the proof task concurrency, because often for each multiproof we
210208
// spawn one Tokio task for the account proof, and one Tokio task for the storage proof.
211209
let max_multi_proof_task_concurrency = max_proof_task_concurrency / 2;
212210
let multi_proof_task = MultiProofTask::new(
213211
state_root_config,
214212
self.executor.clone(),
215-
proof_task.handle(),
213+
proof_task_handle.clone(),
216214
to_sparse_trie,
217215
max_multi_proof_task_concurrency,
218216
config.multiproof_chunking_enabled().then_some(config.multiproof_chunk_size()),
@@ -241,19 +239,7 @@ where
241239
let (state_root_tx, state_root_rx) = channel();
242240

243241
// Spawn the sparse trie task using any stored trie and parallel trie configuration.
244-
self.spawn_sparse_trie_task(sparse_trie_rx, proof_task.handle(), state_root_tx);
245-
246-
// spawn the proof task
247-
self.executor.spawn_blocking(move || {
248-
if let Err(err) = proof_task.run() {
249-
// At least log if there is an error at any point
250-
tracing::error!(
251-
target: "engine::root",
252-
?err,
253-
"Storage proof task returned an error"
254-
);
255-
}
256-
});
242+
self.spawn_sparse_trie_task(sparse_trie_rx, proof_task_handle, state_root_tx);
257243

258244
PayloadHandle {
259245
to_multi_proof,

crates/engine/tree/src/tree/payload_processor/multiproof.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1186,7 +1186,7 @@ mod tests {
11861186
use alloy_primitives::map::B256Set;
11871187
use reth_provider::{providers::ConsistentDbView, test_utils::create_test_provider_factory};
11881188
use reth_trie::{MultiProof, TrieInput};
1189-
use reth_trie_parallel::proof_task::{ProofTaskCtx, ProofTaskManager};
1189+
use reth_trie_parallel::proof_task::{new_proof_task_handle, ProofTaskCtx};
11901190
use revm_primitives::{B256, U256};
11911191
use std::sync::Arc;
11921192

@@ -1213,17 +1213,16 @@ mod tests {
12131213
config.state_sorted.clone(),
12141214
config.prefix_sets.clone(),
12151215
);
1216-
let proof_task = ProofTaskManager::new(
1216+
let proof_task_handle = new_proof_task_handle(
12171217
executor.handle().clone(),
12181218
config.consistent_view.clone(),
12191219
task_ctx,
1220-
1,
1221-
1, // storage_worker_count for test
1220+
1, // max_concurrency for test
12221221
)
1223-
.expect("Failed to create ProofTaskManager for multiproof test");
1222+
.expect("Failed to create proof task handle for multiproof test");
12241223
let channel = channel();
12251224

1226-
MultiProofTask::new(config, executor, proof_task.handle(), channel.0, 1, None)
1225+
MultiProofTask::new(config, executor, proof_task_handle, channel.0, 1, None)
12271226
}
12281227

12291228
#[test]

crates/trie/parallel/src/proof.rs

Lines changed: 13 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,7 @@ use alloy_rlp::{BufMut, Encodable};
1313
use itertools::Itertools;
1414
use reth_execution_errors::StorageRootError;
1515
use reth_provider::{
16-
providers::ConsistentDbView, BlockReader, DBProvider, DatabaseProviderFactory, FactoryTx,
17-
ProviderError,
16+
providers::ConsistentDbView, BlockReader, DBProvider, DatabaseProviderFactory, ProviderError,
1817
};
1918
use reth_storage_errors::db::DatabaseError;
2019
use reth_trie::{
@@ -33,7 +32,7 @@ use reth_trie_common::{
3332
proof::{DecodedProofNodes, ProofRetainer},
3433
};
3534
use reth_trie_db::{DatabaseHashedCursorFactory, DatabaseTrieCursorFactory};
36-
use std::sync::{mpsc::Receiver, Arc};
35+
use std::sync::Arc;
3736
use tracing::trace;
3837

3938
/// Parallel proof calculator.
@@ -58,7 +57,7 @@ pub struct ParallelProof<Factory: DatabaseProviderFactory> {
5857
/// Provided by the user to give the necessary context to retain extra proofs.
5958
multi_added_removed_keys: Option<Arc<MultiAddedRemovedKeys>>,
6059
/// Handle to the storage proof task.
61-
storage_proof_task_handle: ProofTaskManagerHandle<FactoryTx<Factory>>,
60+
storage_proof_task_handle: ProofTaskManagerHandle<<Factory::Provider as DBProvider>::Tx>,
6261
#[cfg(feature = "metrics")]
6362
metrics: ParallelTrieMetrics,
6463
}
@@ -70,7 +69,7 @@ impl<Factory: DatabaseProviderFactory> ParallelProof<Factory> {
7069
nodes_sorted: Arc<TrieUpdatesSorted>,
7170
state_sorted: Arc<HashedPostStateSorted>,
7271
prefix_sets: Arc<TriePrefixSetsMut>,
73-
storage_proof_task_handle: ProofTaskManagerHandle<FactoryTx<Factory>>,
72+
storage_proof_task_handle: ProofTaskManagerHandle<<Factory::Provider as DBProvider>::Tx>,
7473
) -> Self {
7574
Self {
7675
view,
@@ -112,18 +111,17 @@ where
112111
hashed_address: B256,
113112
prefix_set: PrefixSet,
114113
target_slots: B256Set,
115-
) -> Receiver<Result<DecodedStorageMultiProof, ParallelStateRootError>> {
114+
) -> crossbeam_channel::Receiver<Result<DecodedStorageMultiProof, ParallelStateRootError>> {
116115
let input = StorageProofInput::new(
117116
hashed_address,
118117
prefix_set,
119-
Arc::new(target_slots),
118+
target_slots,
120119
self.collect_branch_node_masks,
121120
self.multi_added_removed_keys.clone(),
122121
);
123122

124-
let (sender, receiver) = std::sync::mpsc::channel();
125-
let _ =
126-
self.storage_proof_task_handle.queue_task(ProofTaskKind::StorageProof(input, sender));
123+
let (sender, receiver) = crossbeam_channel::unbounded();
124+
self.storage_proof_task_handle.queue_task(ProofTaskKind::StorageProof(input, sender));
127125
receiver
128126
}
129127

@@ -356,7 +354,7 @@ where
356354
#[cfg(test)]
357355
mod tests {
358356
use super::*;
359-
use crate::proof_task::{ProofTaskCtx, ProofTaskManager};
357+
use crate::proof_task::{new_proof_task_handle, ProofTaskCtx};
360358
use alloy_primitives::{
361359
keccak256,
362360
map::{B256Set, DefaultHashBuilder},
@@ -435,19 +433,13 @@ mod tests {
435433

436434
let task_ctx =
437435
ProofTaskCtx::new(Default::default(), Default::default(), Default::default());
438-
let proof_task = ProofTaskManager::new(
436+
let proof_task_handle = new_proof_task_handle(
439437
rt.handle().clone(),
440438
consistent_view.clone(),
441439
task_ctx,
442-
1,
443-
1, // storage_worker_count for test
440+
1, // max_concurrency for test
444441
)
445442
.expect("Failed to create proof task");
446-
let proof_task_handle = proof_task.handle();
447-
448-
// keep the join handle around to make sure it does not return any errors
449-
// after we compute the state root
450-
let join_handle = rt.spawn_blocking(move || proof_task.run());
451443

452444
let parallel_result = ParallelProof::new(
453445
consistent_view,
@@ -482,9 +474,8 @@ mod tests {
482474
// then compare the entire thing for any mask differences
483475
assert_eq!(parallel_result, sequential_result_decoded);
484476

485-
// drop the handle to terminate the task and then block on the proof task handle to make
486-
// sure it does not return any errors
477+
// Drop the handle to release transaction pool resources
478+
// Note: No manager loop to join in the new design - handle manages lifecycle via Drop
487479
drop(proof_task_handle);
488-
rt.block_on(join_handle).unwrap().expect("The proof task should not return an error");
489480
}
490481
}

0 commit comments

Comments
 (0)