Skip to content

Commit

Permalink
dedups shreds by common-header instead of the entire payload
Browse files Browse the repository at this point in the history
Shreds in the retransmit stage:
  * don't have repair nonce (repaired shreds are not retransmitted).
  * are already resigned by this node as the retransmitter.
  * have their leader's signature verified.
Therefore in order to dedup shreds, it suffices to compare:

    (signature, slot, shred-index, shred-type)

Because ShredCommonHeader already includes all of the above tuple,
the rest of the payload can be skipped.
  • Loading branch information
behzadnouri committed Dec 20, 2024
1 parent ee31e31 commit c0f556d
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 5 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions ledger/src/shred.rs
Original file line number Diff line number Diff line change
Expand Up @@ -638,6 +638,11 @@ pub mod layout {
packet.buffer_mut().get_mut(..size)
}

#[inline]
pub fn get_common_header_bytes(shred: &[u8]) -> Option<&[u8]> {
shred.get(..SIZE_OF_COMMON_SHRED_HEADER)
}

pub(crate) fn get_signature(shred: &[u8]) -> Option<Signature> {
shred
.get(..SIZE_OF_SIGNATURE)
Expand Down
1 change: 1 addition & 0 deletions turbine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ tokio = { workspace = true }

[dev-dependencies]
assert_matches = { workspace = true }
bs58 = { workspace = true }
solana-logger = { workspace = true }
solana-runtime = { workspace = true, features = ["dev-context-only-utils"] }
test-case = { workspace = true }
Expand Down
33 changes: 28 additions & 5 deletions turbine/src/retransmit_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,10 +165,22 @@ impl<const K: usize> ShredDeduper<K> {
.maybe_reset(rng, false_positive_rate, reset_cycle);
}

// Returns true if the shred is duplicate and should be discarded.
#[must_use]
fn dedup(&self, key: ShredId, shred: &[u8], max_duplicate_count: usize) -> bool {
// Shreds in the retransmit stage:
// * don't have repair nonce (repaired shreds are not retransmitted).
// * are already resigned by this node as the retransmitter.
// * have their leader's signature verified.
// Therefore in order to dedup shreds, it suffices to compare:
// (signature, slot, shred-index, shred-type)
// Because ShredCommonHeader already includes all of the above tuple,
// the rest of the payload can be skipped.
// In order to detect duplicate blocks across cluster, we retransmit
// max_duplicate_count different shreds for each ShredId.
self.deduper.dedup(shred)
shred::layout::get_common_header_bytes(shred)
.map(|header| self.deduper.dedup(header))
.unwrap_or(true)
|| (0..max_duplicate_count).all(|i| self.shred_id_filter.dedup(&(key, i)))
}
}
Expand Down Expand Up @@ -630,14 +642,23 @@ mod tests {
rand::SeedableRng,
rand_chacha::ChaChaRng,
solana_ledger::shred::{Shred, ShredFlags},
solana_sdk::signature::Keypair,
};

fn get_keypair() -> Keypair {
const KEYPAIR: &str = "Fcc2HUvRC7Dv4GgehTziAremzRvwDw5miYu8Ahuu1rsGjA\
5eCn55pXiSkEPcuqviV41rJxrFpZDmHmQkZWfoYYS";
let keypair = bs58::decode(KEYPAIR).into_vec().unwrap();
Keypair::from_bytes(&keypair).unwrap()
}

#[test]
fn test_already_received() {
let slot = 1;
let index = 5;
let version = 0x40;
let shred = Shred::new_from_data(
let keypair = get_keypair();
let mut shred = Shred::new_from_data(
slot,
index,
0,
Expand All @@ -647,14 +668,14 @@ mod tests {
version,
0,
);
shred.sign(&keypair);
let mut rng = ChaChaRng::from_seed([0xa5; 32]);
let shred_deduper = ShredDeduper::<2>::new(&mut rng, /*num_bits:*/ 640_007);
// unique shred for (1, 5) should pass
assert!(!shred_deduper.dedup(shred.id(), shred.payload(), MAX_DUPLICATE_COUNT));
// duplicate shred for (1, 5) blocked
assert!(shred_deduper.dedup(shred.id(), shred.payload(), MAX_DUPLICATE_COUNT));

let shred = Shred::new_from_data(
let mut shred = Shred::new_from_data(
slot,
index,
2,
Expand All @@ -664,12 +685,13 @@ mod tests {
version,
0,
);
shred.sign(&keypair);
// first duplicate shred for (1, 5) passed
assert!(!shred_deduper.dedup(shred.id(), shred.payload(), MAX_DUPLICATE_COUNT));
// then blocked
assert!(shred_deduper.dedup(shred.id(), shred.payload(), MAX_DUPLICATE_COUNT));

let shred = Shred::new_from_data(
let mut shred = Shred::new_from_data(
slot,
index,
8,
Expand All @@ -679,6 +701,7 @@ mod tests {
version,
0,
);
shred.sign(&keypair);
// 2nd duplicate shred for (1, 5) blocked
assert!(shred_deduper.dedup(shred.id(), shred.payload(), MAX_DUPLICATE_COUNT));
assert!(shred_deduper.dedup(shred.id(), shred.payload(), MAX_DUPLICATE_COUNT));
Expand Down

0 comments on commit c0f556d

Please sign in to comment.