diff --git a/core/src/window_service.rs b/core/src/window_service.rs index 63e1f06ad110b9..b8749976a4a54b 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -38,7 +38,7 @@ use { solana_turbine::cluster_nodes, std::{ cmp::Reverse, - collections::{HashMap, HashSet}, + collections::HashMap, net::{SocketAddr, UdpSocket}, sync::{ atomic::{AtomicBool, Ordering}, @@ -240,32 +240,15 @@ fn verify_repair( } fn prune_shreds_by_repair_status( - shreds: &mut Vec, - repair_infos: &mut Vec>, + shreds: &mut Vec<(Shred, Option)>, outstanding_requests: &RwLock, accept_repairs_only: bool, ) { - assert_eq!(shreds.len(), repair_infos.len()); - let mut i = 0; - let mut removed = HashSet::new(); - { - let mut outstanding_requests = outstanding_requests.write().unwrap(); - shreds.retain(|shred| { - let should_keep = ( - (!accept_repairs_only || repair_infos[i].is_some()) - && verify_repair(&mut outstanding_requests, shred, &repair_infos[i]), - i += 1, - ) - .0; - if !should_keep { - removed.insert(i - 1); - } - should_keep - }); - } - i = 0; - repair_infos.retain(|_repair_info| (!removed.contains(&i), i += 1).0); - assert_eq!(shreds.len(), repair_infos.len()); + let mut outstanding_requests = outstanding_requests.write().unwrap(); + shreds.retain(|(shred, repair_meta)| { + (!accept_repairs_only || repair_meta.is_some()) + && verify_repair(&mut outstanding_requests, shred, repair_meta) + }); } #[allow(clippy::too_many_arguments)] @@ -310,16 +293,19 @@ where } }; let now = Instant::now(); - let (mut shreds, mut repair_infos): (Vec<_>, Vec<_>) = thread_pool.install(|| { + let mut shreds: Vec<_> = thread_pool.install(|| { packets .par_iter() .with_min_len(32) .flat_map_iter(|packets| packets.iter().filter_map(handle_packet)) - .unzip() + .collect() }); ws_metrics.handle_packets_elapsed_us += now.elapsed().as_micros() as u64; ws_metrics.num_packets += packets.iter().map(PacketBatch::len).sum::(); - ws_metrics.num_repairs += repair_infos.iter().filter(|r| r.is_some()).count(); + ws_metrics.num_repairs += shreds + .iter() + .filter(|(_, repair_meta)| repair_meta.is_some()) + .count(); ws_metrics.num_shreds_received += shreds.len(); for packet in packets.iter().flat_map(PacketBatch::iter) { let addr = packet.meta().socket_addr(); @@ -328,23 +314,16 @@ where let mut prune_shreds_elapsed = Measure::start("prune_shreds_elapsed"); let num_shreds = shreds.len(); - prune_shreds_by_repair_status( - &mut shreds, - &mut repair_infos, - outstanding_requests, - accept_repairs_only, - ); + prune_shreds_by_repair_status(&mut shreds, outstanding_requests, accept_repairs_only); ws_metrics.num_shreds_pruned_invalid_repair = num_shreds - shreds.len(); - let repairs: Vec<_> = repair_infos - .iter() - .map(|repair_info| repair_info.is_some()) - .collect(); prune_shreds_elapsed.stop(); ws_metrics.prune_shreds_elapsed_us += prune_shreds_elapsed.as_us(); + let shreds = shreds + .into_iter() + .map(|(shred, repair_meta)| (shred, repair_meta.is_some())); let completed_data_sets = blockstore.insert_shreds_handle_duplicate( shreds, - repairs, Some(leader_schedule_cache), false, // is_trusted retransmit_sender, @@ -719,10 +698,12 @@ mod test { }; assert_eq!(duplicate_shred.slot(), slot); // Simulate storing both duplicate shreds in the same batch + let shreds = [original_shred.clone(), duplicate_shred.clone()] + .into_iter() + .map(|shred| (shred, /*is_repaired:*/ false)); blockstore .insert_shreds_handle_duplicate( - vec![original_shred.clone(), duplicate_shred.clone()], - vec![false, false], + shreds, None, false, // is_trusted &dummy_retransmit_sender, @@ -762,7 +743,7 @@ mod test { 4, // position 0, // version ); - let mut shreds = vec![shred.clone(), shred.clone(), shred.clone()]; + let shreds = [shred.clone(), shred.clone(), shred.clone()]; let repair_meta = RepairMeta { nonce: 0 }; let outstanding_requests = Arc::new(RwLock::new(OutstandingShredRepairs::default())); let repair_type = ShredRepairType::Orphan(9); @@ -771,22 +752,22 @@ mod test { .unwrap() .add_request(repair_type, timestamp()); let repair_meta1 = RepairMeta { nonce }; - let mut repair_infos = vec![None, Some(repair_meta), Some(repair_meta1)]; - prune_shreds_by_repair_status(&mut shreds, &mut repair_infos, &outstanding_requests, false); + let repair_meta = [None, Some(repair_meta), Some(repair_meta1)]; + let mut shreds = shreds.into_iter().zip(repair_meta).collect(); + prune_shreds_by_repair_status(&mut shreds, &outstanding_requests, false); assert_eq!(shreds.len(), 2); - assert_eq!(repair_infos.len(), 2); - assert!(repair_infos[0].is_none()); - assert_eq!(repair_infos[1].as_ref().unwrap().nonce, nonce); + assert!(shreds[0].1.is_none()); + assert_eq!(shreds[1].1.as_ref().unwrap().nonce, nonce); - shreds = vec![shred.clone(), shred.clone(), shred]; + let shreds = [shred.clone(), shred.clone(), shred]; let repair_meta2 = RepairMeta { nonce: 0 }; let repair_meta3 = RepairMeta { nonce }; - repair_infos = vec![None, Some(repair_meta2), Some(repair_meta3)]; + let repair_meta = [None, Some(repair_meta2), Some(repair_meta3)]; + let mut shreds = shreds.into_iter().zip(repair_meta).collect(); // In wen_restart, we discard all Turbine shreds and only keep valid repair shreds. - prune_shreds_by_repair_status(&mut shreds, &mut repair_infos, &outstanding_requests, true); + prune_shreds_by_repair_status(&mut shreds, &outstanding_requests, true); assert_eq!(shreds.len(), 1); - assert_eq!(repair_infos.len(), 1); - assert!(repair_infos[0].is_some()); - assert_eq!(repair_infos[0].as_ref().unwrap().nonce, nonce); + assert!(shreds[0].1.is_some()); + assert_eq!(shreds[0].1.as_ref().unwrap().nonce, nonce); } } diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index cfb3b67cad47a7..b726b5a2cdd7cd 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -868,8 +868,7 @@ impl Blockstore { /// based on the results, split out by shred source (tubine vs. repair). fn attempt_shred_insertion( &self, - shreds: Vec, - is_repaired: Vec, + shreds: impl ExactSizeIterator, is_trusted: bool, leader_schedule: Option<&LeaderScheduleCache>, shred_insertion_tracker: &mut ShredInsertionTracker, @@ -877,7 +876,7 @@ impl Blockstore { ) { metrics.num_shreds += shreds.len(); let mut start = Measure::start("Shred insertion"); - for (shred, is_repaired) in shreds.into_iter().zip(is_repaired) { + for (shred, is_repaired) in shreds { let shred_source = if is_repaired { ShredSource::Repaired } else { @@ -1200,8 +1199,7 @@ impl Blockstore { /// input `shreds` vector. fn do_insert_shreds( &self, - shreds: Vec, - is_repaired: Vec, + shreds: impl ExactSizeIterator, leader_schedule: Option<&LeaderScheduleCache>, is_trusted: bool, // When inserting own shreds during leader slots, we shouldn't try to @@ -1215,7 +1213,6 @@ impl Blockstore { )>, metrics: &mut BlockstoreInsertionMetrics, ) -> Result { - assert_eq!(shreds.len(), is_repaired.len()); let mut total_start = Measure::start("Total elapsed"); // Acquire the insertion lock @@ -1229,7 +1226,6 @@ impl Blockstore { self.attempt_shred_insertion( shreds, - is_repaired, is_trusted, leader_schedule, &mut shred_insertion_tracker, @@ -1287,8 +1283,7 @@ impl Blockstore { // Blockstore::insert_shreds when inserting own shreds during leader slots. pub fn insert_shreds_handle_duplicate( &self, - shreds: Vec, - is_repaired: Vec, + shreds: impl ExactSizeIterator, leader_schedule: Option<&LeaderScheduleCache>, is_trusted: bool, retransmit_sender: &Sender>>, @@ -1304,7 +1299,6 @@ impl Blockstore { duplicate_shreds, } = self.do_insert_shreds( shreds, - is_repaired, leader_schedule, is_trusted, Some((reed_solomon_cache, retransmit_sender)), @@ -1368,14 +1362,15 @@ impl Blockstore { // when inserting own shreds during leader slots. pub fn insert_shreds( &self, - shreds: Vec, + shreds: impl IntoIterator, leader_schedule: Option<&LeaderScheduleCache>, is_trusted: bool, ) -> Result> { - let shreds_len = shreds.len(); + let shreds = shreds + .into_iter() + .map(|shred| (shred, /*is_repaired:*/ false)); let insert_results = self.do_insert_shreds( shreds, - vec![false; shreds_len], leader_schedule, is_trusted, None, // (reed_solomon_cache, retransmit_sender) @@ -1392,8 +1387,7 @@ impl Blockstore { ) -> Vec { let insert_results = self .do_insert_shreds( - vec![shred], - vec![false], + [(shred, /*is_repaired:*/ false)].into_iter(), Some(leader_schedule), false, None, // (reed_solomon_cache, retransmit_sender) @@ -6649,7 +6643,7 @@ pub mod tests { let (shreds, _) = make_many_slot_entries(start_slot, num_slots, entries_per_slot); // Insert all shreds except for the shreds with index > 0 from non_full_slot let non_full_slot = start_slot + num_slots / 2; - let (shreds, missing_shreds) = shreds + let (shreds, missing_shreds): (Vec<_>, Vec<_>) = shreds .into_iter() .partition(|shred| shred.slot() != non_full_slot || shred.index() == 0); blockstore.insert_shreds(shreds, None, false).unwrap(); @@ -10227,11 +10221,11 @@ pub mod tests { setup_erasure_shreds(slot, 0, 100); let (dummy_retransmit_sender, _) = crossbeam_channel::bounded(0); - let is_repaired = vec![false; coding_shreds.len()]; blockstore .do_insert_shreds( - coding_shreds, - is_repaired, + coding_shreds + .into_iter() + .map(|shred| (shred, /*is_repaired:*/ false)), Some(&leader_schedule_cache), false, // is_trusted Some((&ReedSolomonCache::default(), &dummy_retransmit_sender)), diff --git a/turbine/src/broadcast_stage/standard_broadcast_run.rs b/turbine/src/broadcast_stage/standard_broadcast_run.rs index 505e3c6120f3cd..5570d10c599cd0 100644 --- a/turbine/src/broadcast_stage/standard_broadcast_run.rs +++ b/turbine/src/broadcast_stage/standard_broadcast_run.rs @@ -294,7 +294,7 @@ impl StandardBroadcastRun { if shred.index() == 0 { blockstore .insert_shreds( - vec![shred.clone()], + [shred.clone()], None, // leader_schedule true, // is_trusted )