Skip to content

Commit

Permalink
Swarm Bootstrap Nodes (#201)
Browse files Browse the repository at this point in the history
# Description

Adds bootstrap nodes in settings that will be connected to on node
startup.

## Link to issue

Please add a link to any relevant issues/tickets.

## Type of change

- [x] New feature (non-breaking change that adds functionality)

Please delete options that are not relevant.

## Test plan (required)

Parsing testing is added with the other settings test. Tests actually
dialing nodes will be added in a different PR since this is a perquisite
to gossip/receipt testing anyways.

---------

Signed-off-by: Melanie Riise <[email protected]>
  • Loading branch information
mriise authored Aug 15, 2023
1 parent 63c23aa commit ba26eff
Show file tree
Hide file tree
Showing 7 changed files with 85 additions and 7 deletions.
1 change: 1 addition & 0 deletions homestar-runtime/fixtures/settings.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ process_collector_interval = 10
[node.network]
events_buffer_len = 1000
websocket_port = 9999
node_addresses = ["/ip4/127.0.0.1/tcp/9998/ws"]
9 changes: 8 additions & 1 deletion homestar-runtime/src/event_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@ use crate::{
use anyhow::Result;
use async_trait::async_trait;
use fnv::FnvHashMap;
use libp2p::{futures::StreamExt, kad::QueryId, request_response::RequestId, swarm::Swarm};
use libp2p::{
core::ConnectedPoint, futures::StreamExt, kad::QueryId, request_response::RequestId,
swarm::Swarm, PeerId,
};
use std::{sync::Arc, time::Duration};
use swarm_event::ResponseEvent;
use tokio::{select, sync::mpsc};
Expand Down Expand Up @@ -54,6 +57,7 @@ pub(crate) struct EventHandler<DB: Database> {
sender: Arc<mpsc::Sender<Event>>,
receiver: mpsc::Receiver<Event>,
query_senders: FnvHashMap<QueryId, (RequestResponseKey, P2PSender)>,
connected_peers: FnvHashMap<PeerId, ConnectedPoint>,
request_response_senders: FnvHashMap<RequestId, (RequestResponseKey, P2PSender)>,
ws_msg_sender: ws::Notifier,
}
Expand All @@ -70,6 +74,7 @@ pub(crate) struct EventHandler<DB: Database> {
sender: Arc<mpsc::Sender<Event>>,
receiver: mpsc::Receiver<Event>,
query_senders: FnvHashMap<QueryId, (RequestResponseKey, P2PSender)>,
connected_peers: FnvHashMap<PeerId, ConnectedPoint>,
request_response_senders: FnvHashMap<RequestId, (RequestResponseKey, P2PSender)>,
}

Expand Down Expand Up @@ -100,6 +105,7 @@ where
receiver,
query_senders: FnvHashMap::default(),
request_response_senders: FnvHashMap::default(),
connected_peers: FnvHashMap::default(),
ws_msg_sender,
}
}
Expand All @@ -117,6 +123,7 @@ where
sender: Arc::new(sender),
receiver,
query_senders: FnvHashMap::default(),
connected_peers: FnvHashMap::default(),
request_response_senders: FnvHashMap::default(),
}
}
Expand Down
12 changes: 12 additions & 0 deletions homestar-runtime/src/event_handler/swarm_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ async fn handle_swarm_event<THandlerErr: fmt::Debug + Send, DB: Database>(
event_handler: &mut EventHandler<DB>,
) {
match event {
// TODO: add identify for adding compatable kademlia nodes.
// TODO: use kademlia to discover new gossip nodes.
SwarmEvent::Behaviour(ComposedEvent::Gossipsub(gossip_event)) => match *gossip_event {
gossipsub::Event::Message {
message,
Expand Down Expand Up @@ -370,6 +372,16 @@ async fn handle_swarm_event<THandlerErr: fmt::Debug + Send, DB: Database>(
);
}
SwarmEvent::IncomingConnection { .. } => {}
SwarmEvent::ConnectionEstablished {
peer_id, endpoint, ..
} => {
// add peer to connected peers list
event_handler.connected_peers.insert(peer_id, endpoint);
}
SwarmEvent::ConnectionClosed { peer_id, cause, .. } => {
info!("peer connection closed {peer_id}, cause: {cause:?}");
event_handler.connected_peers.remove_entry(&peer_id);
}
_ => {}
}
}
Expand Down
40 changes: 36 additions & 4 deletions homestar-runtime/src/network/swarm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,16 @@ use libp2p::{
core::upgrade,
gossipsub::{self, MessageId, SubscriptionError, TopicHash},
kad::{record::store::MemoryStore, Kademlia, KademliaEvent},
mdns, noise,
mdns,
multiaddr::Protocol,
noise,
request_response::{self, ProtocolSupport},
swarm::{NetworkBehaviour, Swarm, SwarmBuilder},
tcp, yamux, StreamProtocol, Transport,
};
use serde::{Deserialize, Serialize};
use std::fmt;
use tracing::{info, warn};

/// Build a new [Swarm] with a given transport and a tokio executor.
pub(crate) async fn new(settings: &settings::Node) -> Result<Swarm<ComposedBehaviour>> {
Expand All @@ -27,6 +30,7 @@ pub(crate) async fn new(settings: &settings::Node) -> Result<Swarm<ComposedBehav
.with_context(|| "Failed to generate/import keypair for libp2p".to_string())?;

let peer_id = keypair.public().to_peer_id();
info!(peer_id=?peer_id.to_string(), "local peer ID generated");

let transport = tcp::tokio::Transport::new(tcp::Config::default().nodelay(true))
.upgrade(upgrade::Version::V1Lazy)
Expand Down Expand Up @@ -60,15 +64,43 @@ pub(crate) async fn new(settings: &settings::Node) -> Result<Swarm<ComposedBehav
)
.build();

startup(&mut swarm, &settings.network)?;

Ok(swarm)
}

fn startup(swarm: &mut Swarm<ComposedBehaviour>, settings: &settings::Network) -> Result<()> {
// Listen-on given address
swarm.listen_on(settings.network.listen_address.to_string().parse()?)?;
swarm.listen_on(settings.listen_address.to_string().parse()?)?;

// Dial nodes specified in settings. Failure here shouldn't halt node startup.
for addr in &settings.node_addresses {
match swarm.dial(addr.clone()) {
Ok(_) => info!(addr=?addr, "successfully dialed configured node"),
// log dial failure and continue
Err(e) => warn!(err=?e, "failed to dial configured node"),
}

// subscribe to `receipts` topic
// add node to kademlia routing table
if let Some(Protocol::P2p(peer_id)) =
addr.iter().find(|proto| matches!(proto, Protocol::P2p(_)))
{
info!(addr=?addr, "added configured node to kademlia routing table");
swarm
.behaviour_mut()
.kademlia
.add_address(&peer_id, addr.clone());
} else {
warn!(addr=?addr, err="configured node address did not include a peer ID", "node not added to kademlia routing table")
}
}

// join `receipts` topic
swarm
.behaviour_mut()
.gossip_subscribe(pubsub::RECEIPTS_TOPIC)?;

Ok(swarm)
Ok(())
}

/// Key data structure for [request_response::Event] messages.
Expand Down
9 changes: 7 additions & 2 deletions homestar-runtime/src/settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,9 @@ pub struct Network {
pub(crate) workflow_quorum: usize,
/// Pubkey setup configuration
pub(crate) keypair_config: PubkeyConfig,
/// Multiaddrs of the trusted nodes to connect to on startup.
#[serde_as(as = "Vec<serde_with::DisplayFromStr>")]
pub(crate) node_addresses: Vec<libp2p::Multiaddr>,
}

/// Database-related settings for a homestar node.
Expand Down Expand Up @@ -172,6 +175,7 @@ impl Default for Network {
websocket_capacity: 100,
workflow_quorum: 3,
keypair_config: PubkeyConfig::Random,
node_addresses: Vec::new(),
}
}
}
Expand Down Expand Up @@ -265,8 +269,9 @@ mod test {
default_modded_settings.network.websocket_port = 9999;
default_modded_settings.gc_interval = Duration::from_secs(1800);
default_modded_settings.shutdown_timeout = Duration::from_secs(20);

assert_eq!(settings.node, default_modded_settings);
default_modded_settings.network.node_addresses =
vec!["/ip4/127.0.0.1/tcp/9998/ws".to_string().try_into().unwrap()];
assert_eq!(settings.node(), &default_modded_settings);
}

#[test]
Expand Down
11 changes: 11 additions & 0 deletions homestar-runtime/tests/test_node1/config/settings.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
[monitoring]
process_collector_interval = 10

[node]

[node.network]
websocket_port = 9090
trusted_node_addresses = ["/ip4/127.0.0.1/tcp/9091/ws"]

[node.network.keypair_config]
existing = { key_type = "ed25519", path = "../../fixtures/__testkey_ed25519.pem" }
10 changes: 10 additions & 0 deletions homestar-runtime/tests/test_node2/config/settings.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
[monitoring]
process_collector_interval = 10

[node]

[node.network]
websocket_port = 9091
node_addresses = [
"/ip4/127.0.0.1/tcp/9090/ws/p2p/12D3KooWDpJ7As7BWAwRMfu1VU2WCqNjvq387JEYKDBj4kx6nXTN",
]

0 comments on commit ba26eff

Please sign in to comment.