Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reworks update_completed_data_indexes when inserting shreds #4531

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 15 additions & 16 deletions core/src/completed_data_sets_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,18 +64,11 @@ impl CompletedDataSetsService {
rpc_subscriptions: &RpcSubscriptions,
max_slots: &Arc<MaxSlots>,
) -> Result<(), RecvTimeoutError> {
let completed_data_sets = completed_sets_receiver.recv_timeout(Duration::from_secs(1))?;
let mut max_slot = 0;
for completed_set_info in std::iter::once(completed_data_sets)
.chain(completed_sets_receiver.try_iter())
.flatten()
{
let CompletedDataSetInfo {
slot,
start_index,
end_index,
} = completed_set_info;
max_slot = max_slot.max(slot);
const RECV_TIMEOUT: Duration = Duration::from_secs(1);
let handle_completed_data_set_info = |completed_data_set_info| {
let CompletedDataSetInfo { slot, indices } = completed_data_set_info;
let start_index = indices.start;
let end_index = indices.end - 1;
match blockstore.get_entries_in_data_block(slot, start_index, end_index, None) {
Ok(entries) => {
let transactions = Self::get_transaction_signatures(entries);
Expand All @@ -85,11 +78,17 @@ impl CompletedDataSetsService {
}
Err(e) => warn!("completed-data-set-service deserialize error: {:?}", e),
}
slot
};
let slots = completed_sets_receiver
.recv_timeout(RECV_TIMEOUT)
.map(std::iter::once)?
.chain(completed_sets_receiver.try_iter())
.flatten()
.map(handle_completed_data_set_info);
if let Some(slot) = slots.max() {
max_slots.shred_insert.fetch_max(slot, Ordering::Relaxed);
}
max_slots
.shred_insert
.fetch_max(max_slot, Ordering::Relaxed);

Ok(())
}

Expand Down
134 changes: 63 additions & 71 deletions ledger/src/blockstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ use {
fmt::Write,
fs::{self, File},
io::{Error as IoError, ErrorKind},
ops::Bound,
ops::{Bound, Range},
path::{Path, PathBuf},
rc::Rc,
sync::{
Expand Down Expand Up @@ -208,18 +208,12 @@ pub struct InsertResults {
///
/// `solana_core::completed_data_sets_service::CompletedDataSetsService` is the main receiver of
/// `CompletedDataSetInfo`.
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct CompletedDataSetInfo {
/// [`Slot`] to which the [`Shred`]s in this set belong.
pub slot: Slot,

/// Index of the first [`Shred`] in the range of shreds that belong to this set.
/// Range is inclusive, `start_index..=end_index`.
pub start_index: u32,

/// Index of the last [`Shred`] in the range of shreds that belong to this set.
/// Range is inclusive, `start_index..=end_index`.
pub end_index: u32,
/// Data [`Shred`]s' indices in this set.
pub indices: Range<u32>,
}

pub struct BlockstoreSignals {
Expand Down Expand Up @@ -1061,7 +1055,7 @@ impl Blockstore {
shred,
erasure_meta,
&shred_insertion_tracker.just_inserted_shreds,
&mut shred_insertion_tracker.merkle_root_metas,
&shred_insertion_tracker.merkle_root_metas,
&mut shred_insertion_tracker.duplicate_shreds,
);
}
Expand Down Expand Up @@ -1882,7 +1876,7 @@ impl Blockstore {
shred: &Shred,
erasure_meta: &ErasureMeta,
just_inserted_shreds: &HashMap<ShredId, Shred>,
merkle_root_metas: &mut HashMap<ErasureSetId, WorkingEntry<MerkleRootMeta>>,
merkle_root_metas: &HashMap<ErasureSetId, WorkingEntry<MerkleRootMeta>>,
duplicate_shreds: &mut Vec<PossibleDuplicateShred>,
) -> bool {
debug_assert!(erasure_meta.check_coding_shred(shred));
Expand Down Expand Up @@ -2186,14 +2180,14 @@ impl Blockstore {
}
}

fn insert_data_shred(
fn insert_data_shred<'a>(
&self,
slot_meta: &mut SlotMeta,
data_index: &mut ShredIndex,
data_index: &'a mut ShredIndex,
shred: &Shred,
write_batch: &mut WriteBatch,
shred_source: ShredSource,
) -> Result<Vec<CompletedDataSetInfo>> {
) -> Result<impl Iterator<Item = CompletedDataSetInfo> + 'a> {
let slot = shred.slot();
let index = u64::from(shred.index());

Expand Down Expand Up @@ -2242,13 +2236,7 @@ impl Blockstore {
shred.reference_tick(),
data_index,
)
.into_iter()
.map(|(start_index, end_index)| CompletedDataSetInfo {
slot,
start_index,
end_index,
})
.collect();
.map(move |indices| CompletedDataSetInfo { slot, indices });

self.slots_stats.record_shred(
shred.slot(),
Expand Down Expand Up @@ -4699,53 +4687,55 @@ impl Blockstore {
}
}

// Update the `completed_data_indexes` with a new shred `new_shred_index`. If a
// data set is complete, return the range of shred indexes [start_index, end_index]
// Updates the `completed_data_indexes` with a new shred `new_shred_index`.
// If a data set is complete, returns the range of shred indexes
// start_index..end_index
// for that completed data set.
fn update_completed_data_indexes(
fn update_completed_data_indexes<'a>(
is_last_in_data: bool,
new_shred_index: u32,
received_data_shreds: &ShredIndex,
received_data_shreds: &'a ShredIndex,
// Shreds indices which are marked data complete.
completed_data_indexes: &mut BTreeSet<u32>,
) -> Vec<(u32, u32)> {
let start_shred_index = completed_data_indexes
.range(..new_shred_index)
.next_back()
.map(|index| index + 1)
.unwrap_or_default();
// Consecutive entries i, k, j in this vector represent potential ranges [i, k),
// [k, j) that could be completed data ranges
let mut shred_indices = vec![start_shred_index];
// `new_shred_index` is data complete, so need to insert here into the
// `completed_data_indexes`
if is_last_in_data {
completed_data_indexes.insert(new_shred_index);
shred_indices.push(new_shred_index + 1);
}
if let Some(index) = completed_data_indexes.range(new_shred_index + 1..).next() {
shred_indices.push(index + 1);
}
shred_indices
.windows(2)
.filter(|ix| {
let (begin, end) = (ix[0] as u64, ix[1] as u64);
let num_shreds = (end - begin) as usize;
received_data_shreds.range(begin..end).count() == num_shreds
})
.map(|ix| (ix[0], ix[1] - 1))
.collect()
) -> impl Iterator<Item = Range<u32>> + 'a {
// Consecutive entries i, j, k in this array represent potential ranges
// [i, j), [j, k) that could be completed data ranges
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think its actually impossible for this array to ever have three i, j, k where all i, j, and k are Some(x), because the is_last_in_data check and the last completed_data_indexes.range check should be mutually exclusive.

Might be good for clarity to mention there really are only two entries after the call to flatten(), that define a singular range

Copy link
Author

@behzadnouri behzadnouri Jan 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, why should it be impossible? 🤔

Lets say the banking-stage creates 3 small consecutive Vec<Entry> and passes them down to the shredder each separately.
And lets say each of the 3 Vec<Entry> fits in a single data shred, so the shredder creates 3 separate data shreds each with DATA_COMPLETE_SHRED flag and consecutive indexes, say, shred index: 20, 21 and 22.

Assume that you have already received indices 20 and 22, and they are already inserted in completed_data_indexes.
At the moment that you receive new_shred_index == 21 this function should initialize an array with entries

[Some(20 + 1), Some(21 + 1), Some(22 + 1)]

and the function should return 2 ranges: 21..22 and 22..23.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@carllin does this make sense?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I agree with Behzad here; I worked through an example from a real block as a sanity check. The real, full block has these as completed_data_indexes:

completed_data_indexes: {35, 70, 105, 141, 177, 212, 248, 289, 328, 366, 407, 452, 492, 544, 583, 623, 661, 699, 738, 774, 806}

Imagine that every shred except the one at index 35 has previously been inserted, and we are now inserting 35. This function will get called with:

is_last_in_data: true
new_shred_index: 35
recieved_shreds = [0..34, 36..]
completed_data_index = [70, ...]

And this array will then yield

[
    Some(0),  // We hit the or case since completed_data_index.range(..35).next_back() yields None
    Some(36), // is_last_in_data == true so yield new_shred_index + 1 = 35 + 1 = 36
    Some(70), // completed_data_index.range(36..).next()
]

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, I think this segment of the pre-existing unit test exhibits a similar scenario:

agave/ledger/src/blockstore.rs

Lines 10722 to 10729 in ac663e2

// Inserting data complete shred 0 now confirms the range of shreds [0]
// is part of the same data set
shred_index.insert(0);
assert_eq!(
update_completed_data_indexes(true, 0, &shred_index, &mut completed_data_indexes),
vec![(0, 0), (1, 1)]
);
assert!(completed_data_indexes.iter().eq([0, 1, 3].iter()));

[
completed_data_indexes
.range(..new_shred_index)
.next_back()
.map(|index| index + 1)
.or(Some(0u32)),
is_last_in_data.then(|| {
// new_shred_index is data complete, so need to insert here into
// the completed_data_indexes.
completed_data_indexes.insert(new_shred_index);
new_shred_index + 1
}),
completed_data_indexes
.range(new_shred_index + 1..)
.next()
.map(|index| index + 1),
]
.into_iter()
.flatten()
.tuple_windows()
.filter(|&(start, end)| {
let bounds = u64::from(start)..u64::from(end);
received_data_shreds.range(bounds.clone()).eq(bounds)
})
.map(|(start, end)| start..end)
}

fn update_slot_meta(
fn update_slot_meta<'a>(
is_last_in_slot: bool,
is_last_in_data: bool,
slot_meta: &mut SlotMeta,
index: u32,
new_consumed: u64,
reference_tick: u8,
received_data_shreds: &ShredIndex,
) -> Vec<(u32, u32)> {
received_data_shreds: &'a ShredIndex,
) -> impl Iterator<Item = Range<u32>> + 'a {
let first_insert = slot_meta.received == 0;
// Index is zero-indexed, while the "received" height starts from 1,
// so received = index + 1 for the same shred.
Expand Down Expand Up @@ -6090,8 +6080,7 @@ pub mod tests {
.unwrap(),
vec![CompletedDataSetInfo {
slot,
start_index: 0,
end_index: num_shreds as u32 - 1
indices: 0..num_shreds as u32,
}]
);
// Inserting shreds again doesn't trigger notification
Expand Down Expand Up @@ -10676,10 +10665,13 @@ pub mod tests {

for i in 0..10 {
shred_index.insert(i as u64);
assert_eq!(
update_completed_data_indexes(true, i, &shred_index, &mut completed_data_indexes),
vec![(i, i)]
);
assert!(update_completed_data_indexes(
true,
i,
&shred_index,
&mut completed_data_indexes
)
.eq(std::iter::once(i..i + 1)));
assert!(completed_data_indexes.iter().copied().eq(0..=i));
}
}
Expand All @@ -10692,39 +10684,39 @@ pub mod tests {
shred_index.insert(4);
assert!(
update_completed_data_indexes(false, 4, &shred_index, &mut completed_data_indexes)
.is_empty()
.eq([])
);
assert!(completed_data_indexes.is_empty());

shred_index.insert(2);
assert!(
update_completed_data_indexes(false, 2, &shred_index, &mut completed_data_indexes)
.is_empty()
.eq([])
);
assert!(completed_data_indexes.is_empty());

shred_index.insert(3);
assert!(
update_completed_data_indexes(true, 3, &shred_index, &mut completed_data_indexes)
.is_empty()
.eq([])
);
assert!(completed_data_indexes.iter().eq([3].iter()));

// Inserting data complete shred 1 now confirms the range of shreds [2, 3]
// is part of the same data set
shred_index.insert(1);
assert_eq!(
update_completed_data_indexes(true, 1, &shred_index, &mut completed_data_indexes),
vec![(2, 3)]
assert!(
update_completed_data_indexes(true, 1, &shred_index, &mut completed_data_indexes)
.eq(std::iter::once(2..4))
);
assert!(completed_data_indexes.iter().eq([1, 3].iter()));

// Inserting data complete shred 0 now confirms the range of shreds [0]
// is part of the same data set
shred_index.insert(0);
assert_eq!(
update_completed_data_indexes(true, 0, &shred_index, &mut completed_data_indexes),
vec![(0, 0), (1, 1)]
assert!(
update_completed_data_indexes(true, 0, &shred_index, &mut completed_data_indexes)
.eq([0..1, 1..2])
);
assert!(completed_data_indexes.iter().eq([0, 1, 3].iter()));
}
Expand Down
Loading