From 437867b7739f515e7f5fd3f85a47260ef3567c19 Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Sat, 18 Jan 2025 13:21:44 -0600 Subject: [PATCH] reworks update_completed_data_indexes when inserting shreds blockstore::update_completed_data_indexes https://github.com/anza-xyz/agave/blob/0f56076a2/ledger/src/blockstore.rs#L4722-L4758 can be implemented more idiomatically and avoid several allocations in the process by just holding onto iterators: https://github.com/anza-xyz/agave/blob/0f56076a2/ledger/src/blockstore.rs#L4739 https://github.com/anza-xyz/agave/blob/0f56076a2/ledger/src/blockstore.rs#L4757 https://github.com/anza-xyz/agave/blob/0f56076a2/ledger/src/blockstore.rs#L2254 Additionally we can utilize Range instead of (u32, u32) tuples which are ambiguous if they are inclusive or not: https://github.com/anza-xyz/agave/blob/0f56076a2/ledger/src/blockstore.rs#L4731 https://github.com/anza-xyz/agave/blob/0f56076a2/ledger/src/blockstore.rs#L4768 https://github.com/anza-xyz/agave/blob/0f56076a2/ledger/src/blockstore.rs#L216-L222 --- core/src/completed_data_sets_service.rs | 31 +++--- ledger/src/blockstore.rs | 134 +++++++++++------------- 2 files changed, 78 insertions(+), 87 deletions(-) diff --git a/core/src/completed_data_sets_service.rs b/core/src/completed_data_sets_service.rs index 90f85ca6ff412a..0069c14850fd01 100644 --- a/core/src/completed_data_sets_service.rs +++ b/core/src/completed_data_sets_service.rs @@ -64,18 +64,11 @@ impl CompletedDataSetsService { rpc_subscriptions: &RpcSubscriptions, max_slots: &Arc, ) -> Result<(), RecvTimeoutError> { - let completed_data_sets = completed_sets_receiver.recv_timeout(Duration::from_secs(1))?; - let mut max_slot = 0; - for completed_set_info in std::iter::once(completed_data_sets) - .chain(completed_sets_receiver.try_iter()) - .flatten() - { - let CompletedDataSetInfo { - slot, - start_index, - end_index, - } = completed_set_info; - max_slot = max_slot.max(slot); + const RECV_TIMEOUT: Duration = Duration::from_secs(1); + let handle_completed_data_set_info = |completed_data_set_info| { + let CompletedDataSetInfo { slot, indices } = completed_data_set_info; + let start_index = indices.start; + let end_index = indices.end - 1; match blockstore.get_entries_in_data_block(slot, start_index, end_index, None) { Ok(entries) => { let transactions = Self::get_transaction_signatures(entries); @@ -85,11 +78,17 @@ impl CompletedDataSetsService { } Err(e) => warn!("completed-data-set-service deserialize error: {:?}", e), } + slot + }; + let slots = completed_sets_receiver + .recv_timeout(RECV_TIMEOUT) + .map(std::iter::once)? + .chain(completed_sets_receiver.try_iter()) + .flatten() + .map(handle_completed_data_set_info); + if let Some(slot) = slots.max() { + max_slots.shred_insert.fetch_max(slot, Ordering::Relaxed); } - max_slots - .shred_insert - .fetch_max(max_slot, Ordering::Relaxed); - Ok(()) } diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index 8371e18112c4da..a379f05efad777 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -72,7 +72,7 @@ use { fmt::Write, fs::{self, File}, io::{Error as IoError, ErrorKind}, - ops::Bound, + ops::{Bound, Range}, path::{Path, PathBuf}, rc::Rc, sync::{ @@ -208,18 +208,12 @@ pub struct InsertResults { /// /// `solana_core::completed_data_sets_service::CompletedDataSetsService` is the main receiver of /// `CompletedDataSetInfo`. -#[derive(Clone, Copy, Debug, PartialEq, Eq)] +#[derive(Clone, Debug, Eq, PartialEq)] pub struct CompletedDataSetInfo { /// [`Slot`] to which the [`Shred`]s in this set belong. pub slot: Slot, - - /// Index of the first [`Shred`] in the range of shreds that belong to this set. - /// Range is inclusive, `start_index..=end_index`. - pub start_index: u32, - - /// Index of the last [`Shred`] in the range of shreds that belong to this set. - /// Range is inclusive, `start_index..=end_index`. - pub end_index: u32, + /// Data [`Shred`]s' indices in this set. + pub indices: Range, } pub struct BlockstoreSignals { @@ -1061,7 +1055,7 @@ impl Blockstore { shred, erasure_meta, &shred_insertion_tracker.just_inserted_shreds, - &mut shred_insertion_tracker.merkle_root_metas, + &shred_insertion_tracker.merkle_root_metas, &mut shred_insertion_tracker.duplicate_shreds, ); } @@ -1882,7 +1876,7 @@ impl Blockstore { shred: &Shred, erasure_meta: &ErasureMeta, just_inserted_shreds: &HashMap, - merkle_root_metas: &mut HashMap>, + merkle_root_metas: &HashMap>, duplicate_shreds: &mut Vec, ) -> bool { debug_assert!(erasure_meta.check_coding_shred(shred)); @@ -2186,14 +2180,14 @@ impl Blockstore { } } - fn insert_data_shred( + fn insert_data_shred<'a>( &self, slot_meta: &mut SlotMeta, - data_index: &mut ShredIndex, + data_index: &'a mut ShredIndex, shred: &Shred, write_batch: &mut WriteBatch, shred_source: ShredSource, - ) -> Result> { + ) -> Result + 'a> { let slot = shred.slot(); let index = u64::from(shred.index()); @@ -2242,13 +2236,7 @@ impl Blockstore { shred.reference_tick(), data_index, ) - .into_iter() - .map(|(start_index, end_index)| CompletedDataSetInfo { - slot, - start_index, - end_index, - }) - .collect(); + .map(move |indices| CompletedDataSetInfo { slot, indices }); self.slots_stats.record_shred( shred.slot(), @@ -4699,53 +4687,55 @@ impl Blockstore { } } -// Update the `completed_data_indexes` with a new shred `new_shred_index`. If a -// data set is complete, return the range of shred indexes [start_index, end_index] +// Updates the `completed_data_indexes` with a new shred `new_shred_index`. +// If a data set is complete, returns the range of shred indexes +// start_index..end_index // for that completed data set. -fn update_completed_data_indexes( +fn update_completed_data_indexes<'a>( is_last_in_data: bool, new_shred_index: u32, - received_data_shreds: &ShredIndex, + received_data_shreds: &'a ShredIndex, // Shreds indices which are marked data complete. completed_data_indexes: &mut BTreeSet, -) -> Vec<(u32, u32)> { - let start_shred_index = completed_data_indexes - .range(..new_shred_index) - .next_back() - .map(|index| index + 1) - .unwrap_or_default(); - // Consecutive entries i, k, j in this vector represent potential ranges [i, k), - // [k, j) that could be completed data ranges - let mut shred_indices = vec![start_shred_index]; - // `new_shred_index` is data complete, so need to insert here into the - // `completed_data_indexes` - if is_last_in_data { - completed_data_indexes.insert(new_shred_index); - shred_indices.push(new_shred_index + 1); - } - if let Some(index) = completed_data_indexes.range(new_shred_index + 1..).next() { - shred_indices.push(index + 1); - } - shred_indices - .windows(2) - .filter(|ix| { - let (begin, end) = (ix[0] as u64, ix[1] as u64); - let num_shreds = (end - begin) as usize; - received_data_shreds.range(begin..end).count() == num_shreds - }) - .map(|ix| (ix[0], ix[1] - 1)) - .collect() +) -> impl Iterator> + 'a { + // Consecutive entries i, j, k in this array represent potential ranges + // [i, j), [j, k) that could be completed data ranges + [ + completed_data_indexes + .range(..new_shred_index) + .next_back() + .map(|index| index + 1) + .or(Some(0u32)), + is_last_in_data.then(|| { + // new_shred_index is data complete, so need to insert here into + // the completed_data_indexes. + completed_data_indexes.insert(new_shred_index); + new_shred_index + 1 + }), + completed_data_indexes + .range(new_shred_index + 1..) + .next() + .map(|index| index + 1), + ] + .into_iter() + .flatten() + .tuple_windows() + .filter(|&(start, end)| { + let bounds = u64::from(start)..u64::from(end); + received_data_shreds.range(bounds.clone()).eq(bounds) + }) + .map(|(start, end)| start..end) } -fn update_slot_meta( +fn update_slot_meta<'a>( is_last_in_slot: bool, is_last_in_data: bool, slot_meta: &mut SlotMeta, index: u32, new_consumed: u64, reference_tick: u8, - received_data_shreds: &ShredIndex, -) -> Vec<(u32, u32)> { + received_data_shreds: &'a ShredIndex, +) -> impl Iterator> + 'a { let first_insert = slot_meta.received == 0; // Index is zero-indexed, while the "received" height starts from 1, // so received = index + 1 for the same shred. @@ -6090,8 +6080,7 @@ pub mod tests { .unwrap(), vec![CompletedDataSetInfo { slot, - start_index: 0, - end_index: num_shreds as u32 - 1 + indices: 0..num_shreds as u32, }] ); // Inserting shreds again doesn't trigger notification @@ -10676,10 +10665,13 @@ pub mod tests { for i in 0..10 { shred_index.insert(i as u64); - assert_eq!( - update_completed_data_indexes(true, i, &shred_index, &mut completed_data_indexes), - vec![(i, i)] - ); + assert!(update_completed_data_indexes( + true, + i, + &shred_index, + &mut completed_data_indexes + ) + .eq(std::iter::once(i..i + 1))); assert!(completed_data_indexes.iter().copied().eq(0..=i)); } } @@ -10692,39 +10684,39 @@ pub mod tests { shred_index.insert(4); assert!( update_completed_data_indexes(false, 4, &shred_index, &mut completed_data_indexes) - .is_empty() + .eq([]) ); assert!(completed_data_indexes.is_empty()); shred_index.insert(2); assert!( update_completed_data_indexes(false, 2, &shred_index, &mut completed_data_indexes) - .is_empty() + .eq([]) ); assert!(completed_data_indexes.is_empty()); shred_index.insert(3); assert!( update_completed_data_indexes(true, 3, &shred_index, &mut completed_data_indexes) - .is_empty() + .eq([]) ); assert!(completed_data_indexes.iter().eq([3].iter())); // Inserting data complete shred 1 now confirms the range of shreds [2, 3] // is part of the same data set shred_index.insert(1); - assert_eq!( - update_completed_data_indexes(true, 1, &shred_index, &mut completed_data_indexes), - vec![(2, 3)] + assert!( + update_completed_data_indexes(true, 1, &shred_index, &mut completed_data_indexes) + .eq(std::iter::once(2..4)) ); assert!(completed_data_indexes.iter().eq([1, 3].iter())); // Inserting data complete shred 0 now confirms the range of shreds [0] // is part of the same data set shred_index.insert(0); - assert_eq!( - update_completed_data_indexes(true, 0, &shred_index, &mut completed_data_indexes), - vec![(0, 0), (1, 1)] + assert!( + update_completed_data_indexes(true, 0, &shred_index, &mut completed_data_indexes) + .eq([0..1, 1..2]) ); assert!(completed_data_indexes.iter().eq([0, 1, 3].iter())); }