From ba26effb85ea8d8e09a4f95263826ea9784a6ffe Mon Sep 17 00:00:00 2001 From: Melanie Riise Date: Mon, 14 Aug 2023 18:05:32 -0700 Subject: [PATCH] Swarm Bootstrap Nodes (#201) # Description Adds bootstrap nodes in settings that will be connected to on node startup. ## Link to issue Please add a link to any relevant issues/tickets. ## Type of change - [x] New feature (non-breaking change that adds functionality) Please delete options that are not relevant. ## Test plan (required) Parsing testing is added with the other settings test. Tests actually dialing nodes will be added in a different PR since this is a perquisite to gossip/receipt testing anyways. --------- Signed-off-by: Melanie Riise --- homestar-runtime/fixtures/settings.toml | 1 + homestar-runtime/src/event_handler.rs | 9 ++++- .../src/event_handler/swarm_event.rs | 12 ++++++ homestar-runtime/src/network/swarm.rs | 40 +++++++++++++++++-- homestar-runtime/src/settings.rs | 9 ++++- .../tests/test_node1/config/settings.toml | 11 +++++ .../tests/test_node2/config/settings.toml | 10 +++++ 7 files changed, 85 insertions(+), 7 deletions(-) create mode 100644 homestar-runtime/tests/test_node1/config/settings.toml create mode 100644 homestar-runtime/tests/test_node2/config/settings.toml diff --git a/homestar-runtime/fixtures/settings.toml b/homestar-runtime/fixtures/settings.toml index 7f471e51..6d80c389 100644 --- a/homestar-runtime/fixtures/settings.toml +++ b/homestar-runtime/fixtures/settings.toml @@ -6,3 +6,4 @@ process_collector_interval = 10 [node.network] events_buffer_len = 1000 websocket_port = 9999 +node_addresses = ["/ip4/127.0.0.1/tcp/9998/ws"] diff --git a/homestar-runtime/src/event_handler.rs b/homestar-runtime/src/event_handler.rs index 190b73f9..0bc93715 100644 --- a/homestar-runtime/src/event_handler.rs +++ b/homestar-runtime/src/event_handler.rs @@ -12,7 +12,10 @@ use crate::{ use anyhow::Result; use async_trait::async_trait; use fnv::FnvHashMap; -use libp2p::{futures::StreamExt, kad::QueryId, request_response::RequestId, swarm::Swarm}; +use libp2p::{ + core::ConnectedPoint, futures::StreamExt, kad::QueryId, request_response::RequestId, + swarm::Swarm, PeerId, +}; use std::{sync::Arc, time::Duration}; use swarm_event::ResponseEvent; use tokio::{select, sync::mpsc}; @@ -54,6 +57,7 @@ pub(crate) struct EventHandler { sender: Arc>, receiver: mpsc::Receiver, query_senders: FnvHashMap, + connected_peers: FnvHashMap, request_response_senders: FnvHashMap, ws_msg_sender: ws::Notifier, } @@ -70,6 +74,7 @@ pub(crate) struct EventHandler { sender: Arc>, receiver: mpsc::Receiver, query_senders: FnvHashMap, + connected_peers: FnvHashMap, request_response_senders: FnvHashMap, } @@ -100,6 +105,7 @@ where receiver, query_senders: FnvHashMap::default(), request_response_senders: FnvHashMap::default(), + connected_peers: FnvHashMap::default(), ws_msg_sender, } } @@ -117,6 +123,7 @@ where sender: Arc::new(sender), receiver, query_senders: FnvHashMap::default(), + connected_peers: FnvHashMap::default(), request_response_senders: FnvHashMap::default(), } } diff --git a/homestar-runtime/src/event_handler/swarm_event.rs b/homestar-runtime/src/event_handler/swarm_event.rs index 2adeea06..34ccbdb2 100644 --- a/homestar-runtime/src/event_handler/swarm_event.rs +++ b/homestar-runtime/src/event_handler/swarm_event.rs @@ -93,6 +93,8 @@ async fn handle_swarm_event( event_handler: &mut EventHandler, ) { match event { + // TODO: add identify for adding compatable kademlia nodes. + // TODO: use kademlia to discover new gossip nodes. SwarmEvent::Behaviour(ComposedEvent::Gossipsub(gossip_event)) => match *gossip_event { gossipsub::Event::Message { message, @@ -370,6 +372,16 @@ async fn handle_swarm_event( ); } SwarmEvent::IncomingConnection { .. } => {} + SwarmEvent::ConnectionEstablished { + peer_id, endpoint, .. + } => { + // add peer to connected peers list + event_handler.connected_peers.insert(peer_id, endpoint); + } + SwarmEvent::ConnectionClosed { peer_id, cause, .. } => { + info!("peer connection closed {peer_id}, cause: {cause:?}"); + event_handler.connected_peers.remove_entry(&peer_id); + } _ => {} } } diff --git a/homestar-runtime/src/network/swarm.rs b/homestar-runtime/src/network/swarm.rs index 3917d73e..058775a1 100644 --- a/homestar-runtime/src/network/swarm.rs +++ b/homestar-runtime/src/network/swarm.rs @@ -10,13 +10,16 @@ use libp2p::{ core::upgrade, gossipsub::{self, MessageId, SubscriptionError, TopicHash}, kad::{record::store::MemoryStore, Kademlia, KademliaEvent}, - mdns, noise, + mdns, + multiaddr::Protocol, + noise, request_response::{self, ProtocolSupport}, swarm::{NetworkBehaviour, Swarm, SwarmBuilder}, tcp, yamux, StreamProtocol, Transport, }; use serde::{Deserialize, Serialize}; use std::fmt; +use tracing::{info, warn}; /// Build a new [Swarm] with a given transport and a tokio executor. pub(crate) async fn new(settings: &settings::Node) -> Result> { @@ -27,6 +30,7 @@ pub(crate) async fn new(settings: &settings::Node) -> Result Result, settings: &settings::Network) -> Result<()> { // Listen-on given address - swarm.listen_on(settings.network.listen_address.to_string().parse()?)?; + swarm.listen_on(settings.listen_address.to_string().parse()?)?; + + // Dial nodes specified in settings. Failure here shouldn't halt node startup. + for addr in &settings.node_addresses { + match swarm.dial(addr.clone()) { + Ok(_) => info!(addr=?addr, "successfully dialed configured node"), + // log dial failure and continue + Err(e) => warn!(err=?e, "failed to dial configured node"), + } - // subscribe to `receipts` topic + // add node to kademlia routing table + if let Some(Protocol::P2p(peer_id)) = + addr.iter().find(|proto| matches!(proto, Protocol::P2p(_))) + { + info!(addr=?addr, "added configured node to kademlia routing table"); + swarm + .behaviour_mut() + .kademlia + .add_address(&peer_id, addr.clone()); + } else { + warn!(addr=?addr, err="configured node address did not include a peer ID", "node not added to kademlia routing table") + } + } + + // join `receipts` topic swarm .behaviour_mut() .gossip_subscribe(pubsub::RECEIPTS_TOPIC)?; - Ok(swarm) + Ok(()) } /// Key data structure for [request_response::Event] messages. diff --git a/homestar-runtime/src/settings.rs b/homestar-runtime/src/settings.rs index bad435ce..f82b270d 100644 --- a/homestar-runtime/src/settings.rs +++ b/homestar-runtime/src/settings.rs @@ -121,6 +121,9 @@ pub struct Network { pub(crate) workflow_quorum: usize, /// Pubkey setup configuration pub(crate) keypair_config: PubkeyConfig, + /// Multiaddrs of the trusted nodes to connect to on startup. + #[serde_as(as = "Vec")] + pub(crate) node_addresses: Vec, } /// Database-related settings for a homestar node. @@ -172,6 +175,7 @@ impl Default for Network { websocket_capacity: 100, workflow_quorum: 3, keypair_config: PubkeyConfig::Random, + node_addresses: Vec::new(), } } } @@ -265,8 +269,9 @@ mod test { default_modded_settings.network.websocket_port = 9999; default_modded_settings.gc_interval = Duration::from_secs(1800); default_modded_settings.shutdown_timeout = Duration::from_secs(20); - - assert_eq!(settings.node, default_modded_settings); + default_modded_settings.network.node_addresses = + vec!["/ip4/127.0.0.1/tcp/9998/ws".to_string().try_into().unwrap()]; + assert_eq!(settings.node(), &default_modded_settings); } #[test] diff --git a/homestar-runtime/tests/test_node1/config/settings.toml b/homestar-runtime/tests/test_node1/config/settings.toml new file mode 100644 index 00000000..6ca9bf53 --- /dev/null +++ b/homestar-runtime/tests/test_node1/config/settings.toml @@ -0,0 +1,11 @@ +[monitoring] +process_collector_interval = 10 + +[node] + +[node.network] +websocket_port = 9090 +trusted_node_addresses = ["/ip4/127.0.0.1/tcp/9091/ws"] + +[node.network.keypair_config] +existing = { key_type = "ed25519", path = "../../fixtures/__testkey_ed25519.pem" } diff --git a/homestar-runtime/tests/test_node2/config/settings.toml b/homestar-runtime/tests/test_node2/config/settings.toml new file mode 100644 index 00000000..05423481 --- /dev/null +++ b/homestar-runtime/tests/test_node2/config/settings.toml @@ -0,0 +1,10 @@ +[monitoring] +process_collector_interval = 10 + +[node] + +[node.network] +websocket_port = 9091 +node_addresses = [ + "/ip4/127.0.0.1/tcp/9090/ws/p2p/12D3KooWDpJ7As7BWAwRMfu1VU2WCqNjvq387JEYKDBj4kx6nXTN", +]