Skip to content

Commit

Permalink
Merge pull request #156 from dusk-network/route_maintainer
Browse files Browse the repository at this point in the history
Fix `find_new_nodes` to query the proper buckets
  • Loading branch information
herr-seppia authored Oct 11, 2024
2 parents f252b87 + 3c6a5e6 commit dd70dd7
Show file tree
Hide file tree
Showing 6 changed files with 116 additions and 53 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Add network version to handshake messages
- Add Ray-ID to MessageInfo for message tracking
- Add warning when discarding incomplete messages
- Add tracing when broadcasting to an eclipsed network

### Fixed

- Fix raptorQ cache default config
- Fix ObjectTransmissionInformation deserialization
- Fix duplicate processing for messages with different RaptorQ configurations
- Fix idle nodes removal on maintainance
- Fix `find_new_nodes` to query the proper buckets

### Changed

Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "kadcast"
authors = ["herr-seppia <[email protected]>"]
version = "0.7.0-rc.10"
version = "0.7.0-rc.11"
edition = "2018"
description = "Implementation of the Kadcast Network Protocol."
categories = ["network-programming"]
Expand Down
34 changes: 14 additions & 20 deletions src/kbucket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ pub use bucket::InsertError;
pub use bucket::InsertOk;
pub use bucket::{NodeInsertError, NodeInsertOk};
use itertools::Itertools;
pub use key::MAX_BUCKET_HEIGHT;
pub use key::{BinaryID, BinaryKey, BinaryNonce};
pub use node::Node;
use std::collections::hash_map::Entry;
Expand All @@ -20,7 +21,6 @@ mod bucket;
mod key;
mod node;
use crate::config::BucketConfig;
use crate::K_ALPHA;
use crate::K_BETA;

pub type BucketHeight = u8;
Expand Down Expand Up @@ -118,25 +118,12 @@ impl<V> Tree<V> {
.map(|(&height, bucket)| (height, bucket.peers()))
}

#[allow(dead_code)]
pub(crate) fn idle_or_empty_heigth(
&'static self,
) -> impl Iterator<Item = BucketHeight> {
let max_buckets = (crate::K_ID_LEN_BYTES * 8) as BucketHeight;
(0..max_buckets).filter(move |h| {
self.buckets.get(h).map_or_else(|| true, |b| b.has_idle())
})
}

// pick at most Alpha nodes for each idle bucket
pub(crate) fn idle_buckets(
&self,
) -> impl Iterator<Item = (BucketHeight, impl Iterator<Item = &Node<V>>)>
{
self.buckets
.iter()
.filter(|(_, bucket)| bucket.has_idle())
.map(|(&height, bucket)| (height, bucket.pick::<K_ALPHA>()))
pub(crate) fn idle_or_empty_height(&self) -> Vec<BucketHeight> {
(0..MAX_BUCKET_HEIGHT as u8)
.filter(|h| {
self.buckets.get(h).map_or_else(|| true, |b| b.has_idle())
})
.collect()
}

// Return the height of a Peer
Expand Down Expand Up @@ -178,6 +165,13 @@ impl<V> Tree<V> {
.map_or(false, |bucket| bucket.is_full())
}

pub(crate) fn bucket_size(&self, height: BucketHeight) -> usize {
self.buckets
.get(&height)
.map(|bucket| bucket.peers().count())
.unwrap_or_default()
}

pub(crate) fn new(root: Node<V>, config: BucketConfig) -> Tree<V> {
info!(
"Building table [K={}] with root: {:?}",
Expand Down
47 changes: 46 additions & 1 deletion src/kbucket/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@ use crate::{K_DIFF_MIN_BIT, K_DIFF_PRODUCED_BIT};

use super::BucketHeight;

pub const MAX_BUCKET_HEIGHT: usize =
K_ID_LEN_BYTES * BucketHeight::BITS as usize;

const _: () = assert!(
(K_ID_LEN_BYTES * BucketHeight::BITS as usize) < BucketHeight::MAX as usize,
MAX_BUCKET_HEIGHT < BucketHeight::MAX as usize,
"K_ID_LEN_BYTES must be lower than BucketHeight::MAX"
);

Expand Down Expand Up @@ -84,6 +87,27 @@ impl BinaryID {
.map(|(i, b)| BinaryID::msb(b).expect("to be Some") + (i << 3) - 1)
}

/// Given a specific `kadcast` distance, this method generates a `BinaryKey`
/// that has the requested XOR-based distance from `self`.
///
/// The method works by flipping the bit at the specified `distance` in the
/// binary representation of the `BinaryId`. The distance is used to
/// identify both the byte (`idx`) and the bit within that byte
/// (`bit_to_change`) to be modified. The bit is toggled (flipped) using
/// an XOR operation, resulting in a new `BinaryKey` that differs from
/// `self` at exactly the requested distance.
pub fn get_at_distance(&self, distance: BucketHeight) -> BinaryKey {
let mut new_key = self.bytes;

let distance = distance as usize;
let idx = distance / 8;
let bit_to_change = distance % 8;

new_key[idx] ^= 1 << bit_to_change;

new_key
}

/// Returns the position of the most significant bit set in a byte.
///
/// Returns `None` if no bit is set.
Expand Down Expand Up @@ -168,6 +192,8 @@ impl BinaryID {
#[cfg(test)]
mod tests {

use itertools::Itertools;

use super::*;
use crate::kbucket::BucketHeight;
use crate::peer::PeerNode;
Expand Down Expand Up @@ -206,6 +232,25 @@ mod tests {
Ok(())
}

fn key_as_string(key: BinaryKey) -> String {
key.iter().map(|b| format!("{b:08b}")).join(" ")
}

#[test]
fn test_get_at_distance() -> Result<()> {
let current = PeerNode::generate("192.168.0.1:666", 0)?;
let current_str = key_as_string(current.as_peer_info().id);
for i in 0..(8 * K_ID_LEN_BYTES) {
let other = current.id().get_at_distance(i as u8);
let other_str = key_as_string(other);
println!("current {current_str}");
println!("other {other_str}");
println!("distance {i:?}");
assert_eq!(current.id().calculate_distance(&other), Some(i as u8))
}
Ok(())
}

#[test]
fn test_id_nonce() -> Result<()> {
let root = PeerNode::generate("192.168.0.1:666", 0)?;
Expand Down
32 changes: 21 additions & 11 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@ use encoding::payload::BroadcastPayload;
use handling::MessageHandler;
pub use handling::MessageInfo;
use itertools::Itertools;
use kbucket::MAX_BUCKET_HEIGHT;
use kbucket::{BucketHeight, Tree};
use maintainer::TableMaintainer;
use peer::{PeerInfo, PeerNode};
use rand::prelude::IteratorRandom;
pub(crate) use rwlock::RwLock;
use tokio::sync::mpsc::{self, Receiver, Sender};
use tokio::task;
use tracing::warn;
use tracing::{error, info};
use transport::{MessageBeanOut, WireNetwork};

Expand Down Expand Up @@ -226,10 +228,24 @@ impl Peer {
return;
}

let tosend: Vec<_> = self
.ktable
.read()
.await
for i in self.extract(message, height).await {
self.outbound_sender.send(i).await.unwrap_or_else(|e| {
error!("Unable to send from broadcast {e}")
});
}
}

async fn extract(
&self,
message: &[u8],
height: Option<BucketHeight>,
) -> Vec<(Message, Vec<SocketAddr>)> {
const LAST_BUCKET_IDX: u8 = MAX_BUCKET_HEIGHT as u8 - 1;
let ktable = self.ktable.read().await;
if height.is_none() && ktable.bucket_size(LAST_BUCKET_IDX) == 0 {
warn!("Broadcasting a new message with empty bucket height {LAST_BUCKET_IDX}")
}
ktable
.extract(height)
.map(|(height, nodes)| {
let msg = Message::broadcast(
Expand All @@ -243,13 +259,7 @@ impl Peer {
nodes.map(|node| *node.value().address()).collect();
(msg, targets)
})
.collect();

for i in tosend {
self.outbound_sender.send(i).await.unwrap_or_else(|e| {
error!("Unable to send from broadcast {e}")
});
}
.collect()
}

/// Send a message to a peer in the network
Expand Down
52 changes: 32 additions & 20 deletions src/maintainer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use crate::encoding::message::{Header, Message};
use crate::kbucket::Tree;
use crate::peer::PeerInfo;
use crate::transport::MessageBeanOut;
use crate::RwLock;
use crate::{RwLock, K_ALPHA};

pub(crate) struct TableMaintainer {
bootstrapping_nodes: Vec<String>,
Expand Down Expand Up @@ -132,27 +132,39 @@ impl TableMaintainer {
self.ktable.write().await.remove_idle_nodes();
}

/// Search for idle buckets (no message received) and try to contact some of
/// the belonging nodes
/// Searches for idle or empty buckets (those without received messages) in
/// the routing table and requests information about the nodes in these
/// buckets from active peers.
///
/// For each identified idle or empty bucket, it calculates a target binary
/// key using the `get_at_distance` method, which flips a specific bit
/// in the node's binary identifier based on the given distance. This
/// generates a new target key that is used to search for additional
/// nodes.
///
/// A set of active peers, up to `K_ALPHA`, is gathered from the current
/// routing table and combined with the bootstrapping nodes to form the
/// list of peers to contact.
///
/// The purpose of this method is to keep the routing table active and up to
/// date by finding new peers whenever buckets are empty or nodes become
/// unresponsive.
async fn find_new_nodes(&self) {
let table_lock_read = self.ktable.read().await;

let find_node_messages = table_lock_read
.idle_buckets()
.flat_map(|(_, idle_nodes)| idle_nodes)
.map(|target| {
(
Message::FindNodes(
self.header,
self.version.clone(),
*target.id().as_binary(),
),
//TODO: Extract alpha nodes
vec![*target.value().address()],
)
});
for find_node in find_node_messages {
self.send(find_node).await;
let buckets_to_refresh = table_lock_read.idle_or_empty_height();

let alive_peers = table_lock_read
.alive_nodes()
.map(|n| n.as_peer_info().to_socket_address())
.take(K_ALPHA)
.chain(self.bootstrapping_nodes_addr().into_iter())
.collect::<Vec<_>>();

for bucket_h in buckets_to_refresh {
let target = self.header.binary_id().get_at_distance(bucket_h);
let msg =
Message::FindNodes(self.header, self.version.clone(), target);
self.send((msg, alive_peers.clone())).await;
}
}
}

0 comments on commit dd70dd7

Please sign in to comment.