Skip to content

Commit

Permalink
removes unnecessary allocations in shreds ingestion path
Browse files Browse the repository at this point in the history
Ingesting shreds unnecessarily allocates a HashSet to remove invalid
repair-meta:
https://github.com/anza-xyz/agave/blob/0f56076a2/core/src/window_service.rs#L250

and a separate vector for repair flags:
https://github.com/anza-xyz/agave/blob/0f56076a2/core/src/window_service.rs#L338-L341

Similarly Blockstore::insert_shreds (called in broadcast-stage to insert
own shreds during leader slots), allocates a vector for repair flags:
https://github.com/anza-xyz/agave/blob/0f56076a2/ledger/src/blockstore.rs#L1381

All can be avoided if blockstore insert_shreds api are updated to take
an argument of type

    shreds: impl ExactSizeIterator<Item = (Shred, /*is_repaired:*/ bool)>,
  • Loading branch information
behzadnouri committed Jan 19, 2025
1 parent 46b1f9a commit fad5cb1
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 69 deletions.
85 changes: 33 additions & 52 deletions core/src/window_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use {
solana_turbine::cluster_nodes,
std::{
cmp::Reverse,
collections::{HashMap, HashSet},
collections::HashMap,
net::{SocketAddr, UdpSocket},
sync::{
atomic::{AtomicBool, Ordering},
Expand Down Expand Up @@ -240,32 +240,15 @@ fn verify_repair(
}

fn prune_shreds_by_repair_status(
shreds: &mut Vec<Shred>,
repair_infos: &mut Vec<Option<RepairMeta>>,
shreds: &mut Vec<(Shred, Option<RepairMeta>)>,
outstanding_requests: &RwLock<OutstandingShredRepairs>,
accept_repairs_only: bool,
) {
assert_eq!(shreds.len(), repair_infos.len());
let mut i = 0;
let mut removed = HashSet::new();
{
let mut outstanding_requests = outstanding_requests.write().unwrap();
shreds.retain(|shred| {
let should_keep = (
(!accept_repairs_only || repair_infos[i].is_some())
&& verify_repair(&mut outstanding_requests, shred, &repair_infos[i]),
i += 1,
)
.0;
if !should_keep {
removed.insert(i - 1);
}
should_keep
});
}
i = 0;
repair_infos.retain(|_repair_info| (!removed.contains(&i), i += 1).0);
assert_eq!(shreds.len(), repair_infos.len());
let mut outstanding_requests = outstanding_requests.write().unwrap();
shreds.retain(|(shred, repair_meta)| {
(!accept_repairs_only || repair_meta.is_some())
&& verify_repair(&mut outstanding_requests, shred, repair_meta)
});
}

#[allow(clippy::too_many_arguments)]
Expand Down Expand Up @@ -310,16 +293,19 @@ where
}
};
let now = Instant::now();
let (mut shreds, mut repair_infos): (Vec<_>, Vec<_>) = thread_pool.install(|| {
let mut shreds: Vec<_> = thread_pool.install(|| {
packets
.par_iter()
.with_min_len(32)
.flat_map_iter(|packets| packets.iter().filter_map(handle_packet))
.unzip()
.collect()
});
ws_metrics.handle_packets_elapsed_us += now.elapsed().as_micros() as u64;
ws_metrics.num_packets += packets.iter().map(PacketBatch::len).sum::<usize>();
ws_metrics.num_repairs += repair_infos.iter().filter(|r| r.is_some()).count();
ws_metrics.num_repairs += shreds
.iter()
.filter(|(_, repair_meta)| repair_meta.is_some())
.count();
ws_metrics.num_shreds_received += shreds.len();
for packet in packets.iter().flat_map(PacketBatch::iter) {
let addr = packet.meta().socket_addr();
Expand All @@ -328,23 +314,16 @@ where

let mut prune_shreds_elapsed = Measure::start("prune_shreds_elapsed");
let num_shreds = shreds.len();
prune_shreds_by_repair_status(
&mut shreds,
&mut repair_infos,
outstanding_requests,
accept_repairs_only,
);
prune_shreds_by_repair_status(&mut shreds, outstanding_requests, accept_repairs_only);
ws_metrics.num_shreds_pruned_invalid_repair = num_shreds - shreds.len();
let repairs: Vec<_> = repair_infos
.iter()
.map(|repair_info| repair_info.is_some())
.collect();
prune_shreds_elapsed.stop();
ws_metrics.prune_shreds_elapsed_us += prune_shreds_elapsed.as_us();

let shreds = shreds
.into_iter()
.map(|(shred, repair_meta)| (shred, repair_meta.is_some()));
let completed_data_sets = blockstore.insert_shreds_handle_duplicate(
shreds,
repairs,
Some(leader_schedule_cache),
false, // is_trusted
Some(retransmit_sender),
Expand Down Expand Up @@ -718,10 +697,12 @@ mod test {
};
assert_eq!(duplicate_shred.slot(), slot);
// Simulate storing both duplicate shreds in the same batch
let shreds = [original_shred.clone(), duplicate_shred.clone()]
.into_iter()
.map(|shred| (shred, /*is_repaired:*/ false));
blockstore
.insert_shreds_handle_duplicate(
vec![original_shred.clone(), duplicate_shred.clone()],
vec![false, false],
shreds,
None,
false, // is_trusted
None,
Expand Down Expand Up @@ -761,7 +742,7 @@ mod test {
4, // position
0, // version
);
let mut shreds = vec![shred.clone(), shred.clone(), shred.clone()];
let shreds = [shred.clone(), shred.clone(), shred.clone()];
let repair_meta = RepairMeta { nonce: 0 };
let outstanding_requests = Arc::new(RwLock::new(OutstandingShredRepairs::default()));
let repair_type = ShredRepairType::Orphan(9);
Expand All @@ -770,22 +751,22 @@ mod test {
.unwrap()
.add_request(repair_type, timestamp());
let repair_meta1 = RepairMeta { nonce };
let mut repair_infos = vec![None, Some(repair_meta), Some(repair_meta1)];
prune_shreds_by_repair_status(&mut shreds, &mut repair_infos, &outstanding_requests, false);
let repair_meta = [None, Some(repair_meta), Some(repair_meta1)];
let mut shreds = shreds.into_iter().zip(repair_meta).collect();
prune_shreds_by_repair_status(&mut shreds, &outstanding_requests, false);
assert_eq!(shreds.len(), 2);
assert_eq!(repair_infos.len(), 2);
assert!(repair_infos[0].is_none());
assert_eq!(repair_infos[1].as_ref().unwrap().nonce, nonce);
assert!(shreds[0].1.is_none());
assert_eq!(shreds[1].1.as_ref().unwrap().nonce, nonce);

shreds = vec![shred.clone(), shred.clone(), shred];
let shreds = [shred.clone(), shred.clone(), shred];
let repair_meta2 = RepairMeta { nonce: 0 };
let repair_meta3 = RepairMeta { nonce };
repair_infos = vec![None, Some(repair_meta2), Some(repair_meta3)];
let repair_meta = [None, Some(repair_meta2), Some(repair_meta3)];
let mut shreds = shreds.into_iter().zip(repair_meta).collect();
// In wen_restart, we discard all Turbine shreds and only keep valid repair shreds.
prune_shreds_by_repair_status(&mut shreds, &mut repair_infos, &outstanding_requests, true);
prune_shreds_by_repair_status(&mut shreds, &outstanding_requests, true);
assert_eq!(shreds.len(), 1);
assert_eq!(repair_infos.len(), 1);
assert!(repair_infos[0].is_some());
assert_eq!(repair_infos[0].as_ref().unwrap().nonce, nonce);
assert!(shreds[0].1.is_some());
assert_eq!(shreds[0].1.as_ref().unwrap().nonce, nonce);
}
}
26 changes: 10 additions & 16 deletions ledger/src/blockstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -875,16 +875,15 @@ impl Blockstore {
/// based on the results, split out by shred source (tubine vs. repair).
fn attempt_shred_insertion(
&self,
shreds: Vec<Shred>,
is_repaired: Vec<bool>,
shreds: impl ExactSizeIterator<Item = (Shred, /*is_repaired:*/ bool)>,
is_trusted: bool,
leader_schedule: Option<&LeaderScheduleCache>,
shred_insertion_tracker: &mut ShredInsertionTracker,
metrics: &mut BlockstoreInsertionMetrics,
) {
metrics.num_shreds += shreds.len();
let mut start = Measure::start("Shred insertion");
for (shred, is_repaired) in shreds.into_iter().zip(is_repaired) {
for (shred, is_repaired) in shreds {
let shred_source = if is_repaired {
ShredSource::Repaired
} else {
Expand Down Expand Up @@ -1214,15 +1213,13 @@ impl Blockstore {
/// input `shreds` vector.
fn do_insert_shreds(
&self,
shreds: Vec<Shred>,
is_repaired: Vec<bool>,
shreds: impl ExactSizeIterator<Item = (Shred, /*is_repaired:*/ bool)>,
leader_schedule: Option<&LeaderScheduleCache>,
is_trusted: bool,
retransmit_sender: Option<&Sender<Vec</*shred:*/ Vec<u8>>>>,
reed_solomon_cache: &ReedSolomonCache,
metrics: &mut BlockstoreInsertionMetrics,
) -> Result<InsertResults> {
assert_eq!(shreds.len(), is_repaired.len());
let mut total_start = Measure::start("Total elapsed");

// Acquire the insertion lock
Expand All @@ -1236,7 +1233,6 @@ impl Blockstore {

self.attempt_shred_insertion(
shreds,
is_repaired,
is_trusted,
leader_schedule,
&mut shred_insertion_tracker,
Expand Down Expand Up @@ -1291,8 +1287,7 @@ impl Blockstore {

pub fn insert_shreds_handle_duplicate<F>(
&self,
shreds: Vec<Shred>,
is_repaired: Vec<bool>,
shreds: impl ExactSizeIterator<Item = (Shred, /*is_repaired:*/ bool)>,
leader_schedule: Option<&LeaderScheduleCache>,
is_trusted: bool,
retransmit_sender: Option<&Sender<Vec</*shred:*/ Vec<u8>>>>,
Expand All @@ -1308,7 +1303,6 @@ impl Blockstore {
duplicate_shreds,
} = self.do_insert_shreds(
shreds,
is_repaired,
leader_schedule,
is_trusted,
retransmit_sender,
Expand Down Expand Up @@ -1371,14 +1365,15 @@ impl Blockstore {

pub fn insert_shreds(
&self,
shreds: Vec<Shred>,
shreds: impl IntoIterator<Item = Shred, IntoIter: ExactSizeIterator>,
leader_schedule: Option<&LeaderScheduleCache>,
is_trusted: bool,
) -> Result<Vec<CompletedDataSetInfo>> {
let shreds_len = shreds.len();
let shreds = shreds
.into_iter()
.map(|shred| (shred, /*is_repaired:*/ false));
let insert_results = self.do_insert_shreds(
shreds,
vec![false; shreds_len],
leader_schedule,
is_trusted,
None, // retransmit-sender
Expand All @@ -1396,8 +1391,7 @@ impl Blockstore {
) -> Vec<PossibleDuplicateShred> {
let insert_results = self
.do_insert_shreds(
vec![shred],
vec![false],
[(shred, /*is_repaired:*/ false)].into_iter(),
Some(leader_schedule),
false,
None, // retransmit-sender
Expand Down Expand Up @@ -6654,7 +6648,7 @@ pub mod tests {
let (shreds, _) = make_many_slot_entries(start_slot, num_slots, entries_per_slot);
// Insert all shreds except for the shreds with index > 0 from non_full_slot
let non_full_slot = start_slot + num_slots / 2;
let (shreds, missing_shreds) = shreds
let (shreds, missing_shreds): (Vec<_>, Vec<_>) = shreds
.into_iter()
.partition(|shred| shred.slot() != non_full_slot || shred.index() == 0);
blockstore.insert_shreds(shreds, None, false).unwrap();
Expand Down
2 changes: 1 addition & 1 deletion turbine/src/broadcast_stage/standard_broadcast_run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ impl StandardBroadcastRun {
if shred.index() == 0 {
blockstore
.insert_shreds(
vec![shred.clone()],
[shred.clone()],
None, // leader_schedule
true, // is_trusted
)
Expand Down

0 comments on commit fad5cb1

Please sign in to comment.