Skip to content

Commit

Permalink
removes unnecessary allocations in shreds ingestion path
Browse files Browse the repository at this point in the history
Ingesting shreds unnecessarily allocates a HashSet to remove invalid
repair-meta:
https://github.com/anza-xyz/agave/blob/0f56076a2/core/src/window_service.rs#L250

and a separate vector for repair flags:
https://github.com/anza-xyz/agave/blob/0f56076a2/core/src/window_service.rs#L338-L341

Similarly Blockstore::insert_shreds (called in broadcast-stage to insert
own shreds during leader slots), allocates a vector for repair flags:
https://github.com/anza-xyz/agave/blob/0f56076a2/ledger/src/blockstore.rs#L1381

All can be avoided if blockstore insert_shreds api are updated to take
an argument of type

    shreds: impl ExactSizeIterator<Item = (Shred, /*repaired:*/ bool)>,
  • Loading branch information
behzadnouri committed Jan 18, 2025
1 parent 0f56076 commit c1b1ef1
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 67 deletions.
83 changes: 31 additions & 52 deletions core/src/window_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use {
solana_turbine::cluster_nodes,
std::{
cmp::Reverse,
collections::{HashMap, HashSet},
collections::HashMap,
net::{SocketAddr, UdpSocket},
sync::{
atomic::{AtomicBool, Ordering},
Expand Down Expand Up @@ -240,32 +240,15 @@ fn verify_repair(
}

fn prune_shreds_by_repair_status(
shreds: &mut Vec<Shred>,
repair_infos: &mut Vec<Option<RepairMeta>>,
shreds: &mut Vec<(Shred, Option<RepairMeta>)>,
outstanding_requests: &RwLock<OutstandingShredRepairs>,
accept_repairs_only: bool,
) {
assert_eq!(shreds.len(), repair_infos.len());
let mut i = 0;
let mut removed = HashSet::new();
{
let mut outstanding_requests = outstanding_requests.write().unwrap();
shreds.retain(|shred| {
let should_keep = (
(!accept_repairs_only || repair_infos[i].is_some())
&& verify_repair(&mut outstanding_requests, shred, &repair_infos[i]),
i += 1,
)
.0;
if !should_keep {
removed.insert(i - 1);
}
should_keep
});
}
i = 0;
repair_infos.retain(|_repair_info| (!removed.contains(&i), i += 1).0);
assert_eq!(shreds.len(), repair_infos.len());
let mut outstanding_requests = outstanding_requests.write().unwrap();
shreds.retain(|(shred, repair_meta)| {
(!accept_repairs_only || repair_meta.is_some())
&& verify_repair(&mut outstanding_requests, shred, repair_meta)
});
}

#[allow(clippy::too_many_arguments)]
Expand Down Expand Up @@ -310,16 +293,19 @@ where
}
};
let now = Instant::now();
let (mut shreds, mut repair_infos): (Vec<_>, Vec<_>) = thread_pool.install(|| {
let mut shreds: Vec<_> = thread_pool.install(|| {
packets
.par_iter()
.with_min_len(32)
.flat_map_iter(|packets| packets.iter().filter_map(handle_packet))
.unzip()
.collect()
});
ws_metrics.handle_packets_elapsed_us += now.elapsed().as_micros() as u64;
ws_metrics.num_packets += packets.iter().map(PacketBatch::len).sum::<usize>();
ws_metrics.num_repairs += repair_infos.iter().filter(|r| r.is_some()).count();
ws_metrics.num_repairs += shreds
.iter()
.filter(|(_, repair_meta)| repair_meta.is_some())
.count();
ws_metrics.num_shreds_received += shreds.len();
for packet in packets.iter().flat_map(PacketBatch::iter) {
let addr = packet.meta().socket_addr();
Expand All @@ -328,23 +314,16 @@ where

let mut prune_shreds_elapsed = Measure::start("prune_shreds_elapsed");
let num_shreds = shreds.len();
prune_shreds_by_repair_status(
&mut shreds,
&mut repair_infos,
outstanding_requests,
accept_repairs_only,
);
prune_shreds_by_repair_status(&mut shreds, outstanding_requests, accept_repairs_only);
ws_metrics.num_shreds_pruned_invalid_repair = num_shreds - shreds.len();
let repairs: Vec<_> = repair_infos
.iter()
.map(|repair_info| repair_info.is_some())
.collect();
prune_shreds_elapsed.stop();
ws_metrics.prune_shreds_elapsed_us += prune_shreds_elapsed.as_us();

let shreds = shreds
.into_iter()
.map(|(shred, repair_meta)| (shred, repair_meta.is_some()));
let completed_data_sets = blockstore.insert_shreds_handle_duplicate(
shreds,
repairs,
Some(leader_schedule_cache),
false, // is_trusted
Some(retransmit_sender),
Expand Down Expand Up @@ -718,10 +697,10 @@ mod test {
};
assert_eq!(duplicate_shred.slot(), slot);
// Simulate storing both duplicate shreds in the same batch
let shreds = [original_shred.clone(), duplicate_shred.clone()];
blockstore
.insert_shreds_handle_duplicate(
vec![original_shred.clone(), duplicate_shred.clone()],
vec![false, false],
shreds.into_iter().map(|shred| (shred, /*repaired:*/ false)),
None,
false, // is_trusted
None,
Expand Down Expand Up @@ -761,7 +740,7 @@ mod test {
4, // position
0, // version
);
let mut shreds = vec![shred.clone(), shred.clone(), shred.clone()];
let shreds = [shred.clone(), shred.clone(), shred.clone()];
let repair_meta = RepairMeta { nonce: 0 };
let outstanding_requests = Arc::new(RwLock::new(OutstandingShredRepairs::default()));
let repair_type = ShredRepairType::Orphan(9);
Expand All @@ -770,22 +749,22 @@ mod test {
.unwrap()
.add_request(repair_type, timestamp());
let repair_meta1 = RepairMeta { nonce };
let mut repair_infos = vec![None, Some(repair_meta), Some(repair_meta1)];
prune_shreds_by_repair_status(&mut shreds, &mut repair_infos, &outstanding_requests, false);
let repair_meta = [None, Some(repair_meta), Some(repair_meta1)];
let mut shreds = shreds.into_iter().zip(repair_meta).collect();
prune_shreds_by_repair_status(&mut shreds, &outstanding_requests, false);
assert_eq!(shreds.len(), 2);
assert_eq!(repair_infos.len(), 2);
assert!(repair_infos[0].is_none());
assert_eq!(repair_infos[1].as_ref().unwrap().nonce, nonce);
assert!(shreds[0].1.is_none());
assert_eq!(shreds[1].1.as_ref().unwrap().nonce, nonce);

shreds = vec![shred.clone(), shred.clone(), shred];
let shreds = [shred.clone(), shred.clone(), shred];
let repair_meta2 = RepairMeta { nonce: 0 };
let repair_meta3 = RepairMeta { nonce };
repair_infos = vec![None, Some(repair_meta2), Some(repair_meta3)];
let repair_meta = [None, Some(repair_meta2), Some(repair_meta3)];
let mut shreds = shreds.into_iter().zip(repair_meta).collect();
// In wen_restart, we discard all Turbine shreds and only keep valid repair shreds.
prune_shreds_by_repair_status(&mut shreds, &mut repair_infos, &outstanding_requests, true);
prune_shreds_by_repair_status(&mut shreds, &outstanding_requests, true);
assert_eq!(shreds.len(), 1);
assert_eq!(repair_infos.len(), 1);
assert!(repair_infos[0].is_some());
assert_eq!(repair_infos[0].as_ref().unwrap().nonce, nonce);
assert!(shreds[0].1.is_some());
assert_eq!(shreds[0].1.as_ref().unwrap().nonce, nonce);
}
}
21 changes: 6 additions & 15 deletions ledger/src/blockstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -875,16 +875,15 @@ impl Blockstore {
/// based on the results, split out by shred source (tubine vs. repair).
fn attempt_shred_insertion(
&self,
shreds: Vec<Shred>,
is_repaired: Vec<bool>,
shreds: impl ExactSizeIterator<Item = (Shred, /*repaired:*/ bool)>,
is_trusted: bool,
leader_schedule: Option<&LeaderScheduleCache>,
shred_insertion_tracker: &mut ShredInsertionTracker,
metrics: &mut BlockstoreInsertionMetrics,
) {
metrics.num_shreds += shreds.len();
let mut start = Measure::start("Shred insertion");
for (shred, is_repaired) in shreds.into_iter().zip(is_repaired) {
for (shred, is_repaired) in shreds {
let shred_source = if is_repaired {
ShredSource::Repaired
} else {
Expand Down Expand Up @@ -1214,15 +1213,13 @@ impl Blockstore {
/// input `shreds` vector.
fn do_insert_shreds(
&self,
shreds: Vec<Shred>,
is_repaired: Vec<bool>,
shreds: impl ExactSizeIterator<Item = (Shred, /*repaired:*/ bool)>,
leader_schedule: Option<&LeaderScheduleCache>,
is_trusted: bool,
retransmit_sender: Option<&Sender<Vec</*shred:*/ Vec<u8>>>>,
reed_solomon_cache: &ReedSolomonCache,
metrics: &mut BlockstoreInsertionMetrics,
) -> Result<InsertResults> {
assert_eq!(shreds.len(), is_repaired.len());
let mut total_start = Measure::start("Total elapsed");

// Acquire the insertion lock
Expand All @@ -1236,7 +1233,6 @@ impl Blockstore {

self.attempt_shred_insertion(
shreds,
is_repaired,
is_trusted,
leader_schedule,
&mut shred_insertion_tracker,
Expand Down Expand Up @@ -1291,8 +1287,7 @@ impl Blockstore {

pub fn insert_shreds_handle_duplicate<F>(
&self,
shreds: Vec<Shred>,
is_repaired: Vec<bool>,
shreds: impl ExactSizeIterator<Item = (Shred, /*repaired:*/ bool)>,
leader_schedule: Option<&LeaderScheduleCache>,
is_trusted: bool,
retransmit_sender: Option<&Sender<Vec</*shred:*/ Vec<u8>>>>,
Expand All @@ -1308,7 +1303,6 @@ impl Blockstore {
duplicate_shreds,
} = self.do_insert_shreds(
shreds,
is_repaired,
leader_schedule,
is_trusted,
retransmit_sender,
Expand Down Expand Up @@ -1375,10 +1369,8 @@ impl Blockstore {
leader_schedule: Option<&LeaderScheduleCache>,
is_trusted: bool,
) -> Result<Vec<CompletedDataSetInfo>> {
let shreds_len = shreds.len();
let insert_results = self.do_insert_shreds(
shreds,
vec![false; shreds_len],
shreds.into_iter().map(|shred| (shred, /*repaired:*/ false)),
leader_schedule,
is_trusted,
None, // retransmit-sender
Expand All @@ -1396,8 +1388,7 @@ impl Blockstore {
) -> Vec<PossibleDuplicateShred> {
let insert_results = self
.do_insert_shreds(
vec![shred],
vec![false],
[(shred, /*repaired:*/ false)].into_iter(),
Some(leader_schedule),
false,
None, // retransmit-sender
Expand Down

0 comments on commit c1b1ef1

Please sign in to comment.