Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

removes unnecessary allocations in shreds ingestion path #4530

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 33 additions & 52 deletions core/src/window_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use {
solana_turbine::cluster_nodes,
std::{
cmp::Reverse,
collections::{HashMap, HashSet},
collections::HashMap,
net::{SocketAddr, UdpSocket},
sync::{
atomic::{AtomicBool, Ordering},
Expand Down Expand Up @@ -240,32 +240,15 @@ fn verify_repair(
}

fn prune_shreds_by_repair_status(
shreds: &mut Vec<Shred>,
repair_infos: &mut Vec<Option<RepairMeta>>,
shreds: &mut Vec<(Shred, Option<RepairMeta>)>,
outstanding_requests: &RwLock<OutstandingShredRepairs>,
accept_repairs_only: bool,
) {
assert_eq!(shreds.len(), repair_infos.len());
let mut i = 0;
let mut removed = HashSet::new();
{
let mut outstanding_requests = outstanding_requests.write().unwrap();
shreds.retain(|shred| {
let should_keep = (
(!accept_repairs_only || repair_infos[i].is_some())
&& verify_repair(&mut outstanding_requests, shred, &repair_infos[i]),
i += 1,
)
.0;
if !should_keep {
removed.insert(i - 1);
}
should_keep
});
}
i = 0;
repair_infos.retain(|_repair_info| (!removed.contains(&i), i += 1).0);
assert_eq!(shreds.len(), repair_infos.len());
let mut outstanding_requests = outstanding_requests.write().unwrap();
shreds.retain(|(shred, repair_meta)| {
(!accept_repairs_only || repair_meta.is_some())
&& verify_repair(&mut outstanding_requests, shred, repair_meta)
});
}

#[allow(clippy::too_many_arguments)]
Expand Down Expand Up @@ -310,16 +293,19 @@ where
}
};
let now = Instant::now();
let (mut shreds, mut repair_infos): (Vec<_>, Vec<_>) = thread_pool.install(|| {
let mut shreds: Vec<_> = thread_pool.install(|| {
packets
.par_iter()
.with_min_len(32)
.flat_map_iter(|packets| packets.iter().filter_map(handle_packet))
.unzip()
.collect()
});
ws_metrics.handle_packets_elapsed_us += now.elapsed().as_micros() as u64;
ws_metrics.num_packets += packets.iter().map(PacketBatch::len).sum::<usize>();
ws_metrics.num_repairs += repair_infos.iter().filter(|r| r.is_some()).count();
ws_metrics.num_repairs += shreds
.iter()
.filter(|(_, repair_meta)| repair_meta.is_some())
.count();
ws_metrics.num_shreds_received += shreds.len();
for packet in packets.iter().flat_map(PacketBatch::iter) {
let addr = packet.meta().socket_addr();
Expand All @@ -328,23 +314,16 @@ where

let mut prune_shreds_elapsed = Measure::start("prune_shreds_elapsed");
let num_shreds = shreds.len();
prune_shreds_by_repair_status(
&mut shreds,
&mut repair_infos,
outstanding_requests,
accept_repairs_only,
);
prune_shreds_by_repair_status(&mut shreds, outstanding_requests, accept_repairs_only);
ws_metrics.num_shreds_pruned_invalid_repair = num_shreds - shreds.len();
let repairs: Vec<_> = repair_infos
.iter()
.map(|repair_info| repair_info.is_some())
.collect();
prune_shreds_elapsed.stop();
ws_metrics.prune_shreds_elapsed_us += prune_shreds_elapsed.as_us();

let shreds = shreds
.into_iter()
.map(|(shred, repair_meta)| (shred, repair_meta.is_some()));
let completed_data_sets = blockstore.insert_shreds_handle_duplicate(
shreds,
repairs,
Some(leader_schedule_cache),
false, // is_trusted
retransmit_sender,
Expand Down Expand Up @@ -719,10 +698,12 @@ mod test {
};
assert_eq!(duplicate_shred.slot(), slot);
// Simulate storing both duplicate shreds in the same batch
let shreds = [original_shred.clone(), duplicate_shred.clone()]
.into_iter()
.map(|shred| (shred, /*is_repaired:*/ false));
blockstore
.insert_shreds_handle_duplicate(
vec![original_shred.clone(), duplicate_shred.clone()],
vec![false, false],
shreds,
None,
false, // is_trusted
&dummy_retransmit_sender,
Expand Down Expand Up @@ -762,7 +743,7 @@ mod test {
4, // position
0, // version
);
let mut shreds = vec![shred.clone(), shred.clone(), shred.clone()];
let shreds = [shred.clone(), shred.clone(), shred.clone()];
let repair_meta = RepairMeta { nonce: 0 };
let outstanding_requests = Arc::new(RwLock::new(OutstandingShredRepairs::default()));
let repair_type = ShredRepairType::Orphan(9);
Expand All @@ -771,22 +752,22 @@ mod test {
.unwrap()
.add_request(repair_type, timestamp());
let repair_meta1 = RepairMeta { nonce };
let mut repair_infos = vec![None, Some(repair_meta), Some(repair_meta1)];
prune_shreds_by_repair_status(&mut shreds, &mut repair_infos, &outstanding_requests, false);
let repair_meta = [None, Some(repair_meta), Some(repair_meta1)];
let mut shreds = shreds.into_iter().zip(repair_meta).collect();
prune_shreds_by_repair_status(&mut shreds, &outstanding_requests, false);
assert_eq!(shreds.len(), 2);
assert_eq!(repair_infos.len(), 2);
assert!(repair_infos[0].is_none());
assert_eq!(repair_infos[1].as_ref().unwrap().nonce, nonce);
assert!(shreds[0].1.is_none());
assert_eq!(shreds[1].1.as_ref().unwrap().nonce, nonce);

shreds = vec![shred.clone(), shred.clone(), shred];
let shreds = [shred.clone(), shred.clone(), shred];
let repair_meta2 = RepairMeta { nonce: 0 };
let repair_meta3 = RepairMeta { nonce };
repair_infos = vec![None, Some(repair_meta2), Some(repair_meta3)];
let repair_meta = [None, Some(repair_meta2), Some(repair_meta3)];
let mut shreds = shreds.into_iter().zip(repair_meta).collect();
// In wen_restart, we discard all Turbine shreds and only keep valid repair shreds.
prune_shreds_by_repair_status(&mut shreds, &mut repair_infos, &outstanding_requests, true);
prune_shreds_by_repair_status(&mut shreds, &outstanding_requests, true);
assert_eq!(shreds.len(), 1);
assert_eq!(repair_infos.len(), 1);
assert!(repair_infos[0].is_some());
assert_eq!(repair_infos[0].as_ref().unwrap().nonce, nonce);
assert!(shreds[0].1.is_some());
assert_eq!(shreds[0].1.as_ref().unwrap().nonce, nonce);
}
}
32 changes: 13 additions & 19 deletions ledger/src/blockstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -875,16 +875,15 @@ impl Blockstore {
/// based on the results, split out by shred source (tubine vs. repair).
fn attempt_shred_insertion(
&self,
shreds: Vec<Shred>,
is_repaired: Vec<bool>,
shreds: impl ExactSizeIterator<Item = (Shred, /*is_repaired:*/ bool)>,
is_trusted: bool,
leader_schedule: Option<&LeaderScheduleCache>,
shred_insertion_tracker: &mut ShredInsertionTracker,
metrics: &mut BlockstoreInsertionMetrics,
) {
metrics.num_shreds += shreds.len();
let mut start = Measure::start("Shred insertion");
for (shred, is_repaired) in shreds.into_iter().zip(is_repaired) {
for (shred, is_repaired) in shreds {
let shred_source = if is_repaired {
ShredSource::Repaired
} else {
Expand Down Expand Up @@ -1212,8 +1211,7 @@ impl Blockstore {
/// input `shreds` vector.
fn do_insert_shreds(
&self,
shreds: Vec<Shred>,
is_repaired: Vec<bool>,
shreds: impl ExactSizeIterator<Item = (Shred, /*is_repaired:*/ bool)>,
leader_schedule: Option<&LeaderScheduleCache>,
is_trusted: bool,
// When inserting own shreds during leader slots, we shouldn't try to
Expand All @@ -1227,7 +1225,6 @@ impl Blockstore {
)>,
metrics: &mut BlockstoreInsertionMetrics,
) -> Result<InsertResults> {
assert_eq!(shreds.len(), is_repaired.len());
let mut total_start = Measure::start("Total elapsed");

// Acquire the insertion lock
Expand All @@ -1241,7 +1238,6 @@ impl Blockstore {

self.attempt_shred_insertion(
shreds,
is_repaired,
is_trusted,
leader_schedule,
&mut shred_insertion_tracker,
Expand Down Expand Up @@ -1299,8 +1295,7 @@ impl Blockstore {
// Blockstore::insert_shreds when inserting own shreds during leader slots.
pub fn insert_shreds_handle_duplicate<F>(
&self,
shreds: Vec<Shred>,
is_repaired: Vec<bool>,
shreds: impl ExactSizeIterator<Item = (Shred, /*is_repaired:*/ bool)>,
leader_schedule: Option<&LeaderScheduleCache>,
is_trusted: bool,
retransmit_sender: &Sender<Vec</*shred:*/ Vec<u8>>>,
Expand All @@ -1316,7 +1311,6 @@ impl Blockstore {
duplicate_shreds,
} = self.do_insert_shreds(
shreds,
is_repaired,
leader_schedule,
is_trusted,
Some((reed_solomon_cache, retransmit_sender)),
Expand Down Expand Up @@ -1380,14 +1374,15 @@ impl Blockstore {
// when inserting own shreds during leader slots.
pub fn insert_shreds(
&self,
shreds: Vec<Shred>,
shreds: impl IntoIterator<Item = Shred, IntoIter: ExactSizeIterator>,
leader_schedule: Option<&LeaderScheduleCache>,
is_trusted: bool,
) -> Result<Vec<CompletedDataSetInfo>> {
let shreds_len = shreds.len();
let shreds = shreds
.into_iter()
.map(|shred| (shred, /*is_repaired:*/ false));
let insert_results = self.do_insert_shreds(
shreds,
vec![false; shreds_len],
leader_schedule,
is_trusted,
None, // (reed_solomon_cache, retransmit_sender)
Expand All @@ -1404,8 +1399,7 @@ impl Blockstore {
) -> Vec<PossibleDuplicateShred> {
let insert_results = self
.do_insert_shreds(
vec![shred],
vec![false],
[(shred, /*is_repaired:*/ false)].into_iter(),
Some(leader_schedule),
false,
None, // (reed_solomon_cache, retransmit_sender)
Expand Down Expand Up @@ -6661,7 +6655,7 @@ pub mod tests {
let (shreds, _) = make_many_slot_entries(start_slot, num_slots, entries_per_slot);
// Insert all shreds except for the shreds with index > 0 from non_full_slot
let non_full_slot = start_slot + num_slots / 2;
let (shreds, missing_shreds) = shreds
let (shreds, missing_shreds): (Vec<_>, Vec<_>) = shreds
.into_iter()
.partition(|shred| shred.slot() != non_full_slot || shred.index() == 0);
blockstore.insert_shreds(shreds, None, false).unwrap();
Expand Down Expand Up @@ -10239,11 +10233,11 @@ pub mod tests {
setup_erasure_shreds(slot, 0, 100);

let (dummy_retransmit_sender, _) = crossbeam_channel::bounded(0);
let is_repaired = vec![false; coding_shreds.len()];
blockstore
.do_insert_shreds(
coding_shreds,
is_repaired,
coding_shreds
.into_iter()
.map(|shred| (shred, /*is_repaired:*/ false)),
Some(&leader_schedule_cache),
false, // is_trusted
Some((&ReedSolomonCache::default(), &dummy_retransmit_sender)),
Expand Down
2 changes: 1 addition & 1 deletion turbine/src/broadcast_stage/standard_broadcast_run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ impl StandardBroadcastRun {
if shred.index() == 0 {
blockstore
.insert_shreds(
vec![shred.clone()],
[shred.clone()],
None, // leader_schedule
true, // is_trusted
)
Expand Down
Loading