Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dedups shreds by common-header instead of the entire payload #4187

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions ledger/src/shred.rs
Original file line number Diff line number Diff line change
Expand Up @@ -638,6 +638,11 @@ pub mod layout {
packet.buffer_mut().get_mut(..size)
}

#[inline]
pub fn get_common_header_bytes(shred: &[u8]) -> Option<&[u8]> {
shred.get(..SIZE_OF_COMMON_SHRED_HEADER)
}

pub(crate) fn get_signature(shred: &[u8]) -> Option<Signature> {
shred
.get(..SIZE_OF_SIGNATURE)
Expand Down
1 change: 1 addition & 0 deletions turbine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ tokio = { workspace = true }

[dev-dependencies]
assert_matches = { workspace = true }
bs58 = { workspace = true }
solana-logger = { workspace = true }
solana-runtime = { workspace = true, features = ["dev-context-only-utils"] }
test-case = { workspace = true }
Expand Down
37 changes: 32 additions & 5 deletions turbine/src/retransmit_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,10 +165,22 @@ impl<const K: usize> ShredDeduper<K> {
.maybe_reset(rng, false_positive_rate, reset_cycle);
}

// Returns true if the shred is duplicate and should be discarded.
#[must_use]
fn dedup(&self, key: ShredId, shred: &[u8], max_duplicate_count: usize) -> bool {
// Shreds in the retransmit stage:
// * don't have repair nonce (repaired shreds are not retransmitted).
// * are already resigned by this node as the retransmitter.
// * have their leader's signature verified.
// Therefore in order to dedup shreds, it suffices to compare:
// (signature, slot, shred-index, shred-type)
// Because ShredCommonHeader already includes all of the above tuple,
// the rest of the payload can be skipped.
// In order to detect duplicate blocks across cluster, we retransmit
// max_duplicate_count different shreds for each ShredId.
self.deduper.dedup(shred)
shred::layout::get_common_header_bytes(shred)
.map(|header| self.deduper.dedup(header))
.unwrap_or(true)
|| (0..max_duplicate_count).all(|i| self.shred_id_filter.dedup(&(key, i)))
}
}
Expand Down Expand Up @@ -630,14 +642,27 @@ mod tests {
rand::SeedableRng,
rand_chacha::ChaChaRng,
solana_ledger::shred::{Shred, ShredFlags},
solana_sdk::signature::Keypair,
};

fn get_keypair() -> Keypair {
const KEYPAIR: &str = "Fcc2HUvRC7Dv4GgehTziAremzRvwDw5miYu8Ahuu1rsGjA\
5eCn55pXiSkEPcuqviV41rJxrFpZDmHmQkZWfoYYS";
bs58::decode(KEYPAIR)
.into_vec()
.as_deref()
.map(Keypair::from_bytes)
.unwrap()
.unwrap()
}

#[test]
fn test_already_received() {
let slot = 1;
let index = 5;
let version = 0x40;
let shred = Shred::new_from_data(
let keypair = get_keypair();
let mut shred = Shred::new_from_data(
slot,
index,
0,
Expand All @@ -647,14 +672,14 @@ mod tests {
version,
0,
);
shred.sign(&keypair);
let mut rng = ChaChaRng::from_seed([0xa5; 32]);
let shred_deduper = ShredDeduper::<2>::new(&mut rng, /*num_bits:*/ 640_007);
// unique shred for (1, 5) should pass
assert!(!shred_deduper.dedup(shred.id(), shred.payload(), MAX_DUPLICATE_COUNT));
// duplicate shred for (1, 5) blocked
assert!(shred_deduper.dedup(shred.id(), shred.payload(), MAX_DUPLICATE_COUNT));

let shred = Shred::new_from_data(
let mut shred = Shred::new_from_data(
slot,
index,
2,
Expand All @@ -664,12 +689,13 @@ mod tests {
version,
0,
);
shred.sign(&keypair);
// first duplicate shred for (1, 5) passed
assert!(!shred_deduper.dedup(shred.id(), shred.payload(), MAX_DUPLICATE_COUNT));
// then blocked
assert!(shred_deduper.dedup(shred.id(), shred.payload(), MAX_DUPLICATE_COUNT));

let shred = Shred::new_from_data(
let mut shred = Shred::new_from_data(
slot,
index,
8,
Expand All @@ -679,6 +705,7 @@ mod tests {
version,
0,
);
shred.sign(&keypair);
// 2nd duplicate shred for (1, 5) blocked
assert!(shred_deduper.dedup(shred.id(), shred.payload(), MAX_DUPLICATE_COUNT));
assert!(shred_deduper.dedup(shred.id(), shred.payload(), MAX_DUPLICATE_COUNT));
Expand Down
Loading