From 86de5ed619bceb6478b553202ec7937e11c7e3ea Mon Sep 17 00:00:00 2001 From: Brian Ginsburg Date: Wed, 31 Jan 2024 16:21:27 -0800 Subject: [PATCH] feat: Finish discovery notifications, schemas, and test upgrades --- homestar-runtime/src/event_handler/cache.rs | 23 +- .../src/event_handler/notification/swarm.rs | 96 +---- .../src/event_handler/swarm_event.rs | 26 +- homestar-runtime/tests/network/rendezvous.rs | 388 +++++++++++++----- 4 files changed, 332 insertions(+), 201 deletions(-) diff --git a/homestar-runtime/src/event_handler/cache.rs b/homestar-runtime/src/event_handler/cache.rs index 8100a9b5..1721d511 100644 --- a/homestar-runtime/src/event_handler/cache.rs +++ b/homestar-runtime/src/event_handler/cache.rs @@ -61,17 +61,15 @@ pub(crate) enum DispatchEvent { pub(crate) fn setup_cache( sender: Arc>, ) -> Cache { - let eviction_listener = - move |_key: Arc, val: CacheValue, cause: RemovalCause| -> ListenerFuture { - let tx = Arc::clone(&sender); - - async move { - if let Some(CacheData::OnExpiration(event)) = val.data.get("on_expiration") { - println!("event: {:?}", event); - if cause != Expired { - return; - } + let eviction_listener = move |_key: Arc, + val: CacheValue, + cause: RemovalCause| + -> ListenerFuture { + let tx = Arc::clone(&sender); + async move { + if let Some(CacheData::OnExpiration(event)) = val.data.get("on_expiration") { + if cause == Expired { match event { DispatchEvent::RegisterPeer => { if let Some(CacheData::Peer(rendezvous_node)) = @@ -99,8 +97,9 @@ pub(crate) fn setup_cache( }; } } - .boxed() - }; + } + .boxed() + }; Cache::builder() .expire_after(Expiry) diff --git a/homestar-runtime/src/event_handler/notification/swarm.rs b/homestar-runtime/src/event_handler/notification/swarm.rs index f4ca078e..587c7120 100644 --- a/homestar-runtime/src/event_handler/notification/swarm.rs +++ b/homestar-runtime/src/event_handler/notification/swarm.rs @@ -5,20 +5,11 @@ use anyhow::anyhow; use chrono::prelude::Utc; use homestar_invocation::ipld::DagJson; -use itertools::Itertools; -use jsonrpsee::core::StringError; -use libipld::{ - serde::{from_ipld, to_ipld}, - Ipld, -}; +use libipld::{serde::from_ipld, Ipld}; use libp2p::{Multiaddr, PeerId}; -use schemars::{ - gen::SchemaGenerator, - schema::{InstanceType, Metadata, ObjectValidation, Schema, SchemaObject, SingleOrVec}, - JsonSchema, -}; +use schemars::JsonSchema; use serde::{Deserialize, Serialize}; -use std::{borrow::Cow, collections::BTreeMap, fmt, str::FromStr}; +use std::{collections::BTreeMap, fmt, str::FromStr}; const TIMESTAMP_KEY: &str = "timestamp"; @@ -145,7 +136,6 @@ pub enum NetworkNotification { /// Rendezvous discover served notification. #[schemars(rename = "discover_served_rendezvous")] DiscoverServedRendezvous(DiscoverServedRendezvous), - // peer_discovered_rendezvous /// Rendezvous peer registered notification. #[schemars(rename = "peer_registered_rendezvous")] PeerRegisteredRendezvous(PeerRegisteredRendezvous), @@ -351,14 +341,16 @@ impl TryFrom for ConnectionClosed { } } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, JsonSchema)] +#[schemars(rename = "discovered_mdns")] pub struct DiscoveredMdns { timestamp: i64, - peers: Vec<(String, String)>, + #[schemars(description = "Peers discovered by peer ID and multiaddress")] + peers: BTreeMap, } impl DiscoveredMdns { - pub(crate) fn new(peers: Vec<(PeerId, Multiaddr)>) -> DiscoveredMdns { + pub(crate) fn new(peers: BTreeMap) -> DiscoveredMdns { DiscoveredMdns { timestamp: Utc::now().timestamp_millis(), peers: peers @@ -401,71 +393,23 @@ impl TryFrom for DiscoveredMdns { .to_owned(), )?; - let peers_map = from_ipld::>( + let peers = from_ipld::>( map.get(peers_key) .ok_or_else(|| anyhow!("missing {peers_key}"))? .to_owned(), )?; - let mut peers: Vec<(String, String)> = vec![]; - for peer in peers_map.iter() { - peers.push((peer.0.to_string(), from_ipld(peer.1.to_owned())?)) - } - Ok(DiscoveredMdns { timestamp, peers }) } } -impl JsonSchema for DiscoveredMdns { - fn schema_name() -> String { - "discovered_mdns".to_owned() - } - - fn schema_id() -> Cow<'static, str> { - Cow::Borrowed("homestar-runtime::event_handler::notification::swarm::DiscoveredMdns") - } - - fn json_schema(gen: &mut SchemaGenerator) -> Schema { - let schema = SchemaObject { - instance_type: Some(SingleOrVec::Single(InstanceType::Object.into())), - object: Some(Box::new(ObjectValidation { - properties: BTreeMap::from([ - ( - "timestamp".to_string(), - Schema::Object(SchemaObject { - instance_type: Some(SingleOrVec::Single(InstanceType::Number.into())), - ..Default::default() - }), - ), - ( - "peers".to_string(), - Schema::Object(SchemaObject { - instance_type: Some(SingleOrVec::Single(InstanceType::Object.into())), - metadata: Some(Box::new(Metadata { - description: Some("Peers and their addresses".to_string()), - ..Default::default() - })), - object: Some(Box::new(ObjectValidation { - additional_properties: Some(Box::new(::json_schema(gen))), - ..Default::default() - })), - ..Default::default() - }), - ), - ]), - ..Default::default() - })), - ..Default::default() - }; - schema.into() - } -} - #[derive(Debug, Clone, JsonSchema)] #[schemars(rename = "discovered_rendezvous")] pub struct DiscoveredRendezvous { timestamp: i64, + #[schemars(description = "Server that fulfilled the discovery request")] server: String, + #[schemars(description = "Peers discovered by peer ID and multiaddresses")] peers: BTreeMap>, } @@ -561,6 +505,7 @@ impl TryFrom for DiscoveredRendezvous { #[schemars(rename = "registered_rendezvous")] pub struct RegisteredRendezvous { timestamp: i64, + #[schemars(description = "Server that accepted registration")] server: String, } @@ -613,6 +558,7 @@ impl TryFrom for RegisteredRendezvous { #[schemars(rename = "registered_rendezvous")] pub struct DiscoverServedRendezvous { timestamp: i64, + #[schemars(description = "Peer that requested discovery")] enquirer: String, } @@ -668,7 +614,9 @@ impl TryFrom for DiscoverServedRendezvous { #[schemars(rename = "peer_registered_rendezvous")] pub struct PeerRegisteredRendezvous { timestamp: i64, + #[schemars(description = "Peer registered")] peer_id: String, + #[schemars(description = "Multiaddresses for peer")] addresses: Vec, } @@ -751,7 +699,7 @@ mod test { peer_id: PeerId, address: Multiaddr, addresses: Vec, - peers: Vec<(PeerId, Multiaddr)>, + peers: BTreeMap, peers_vec_addr: BTreeMap>, } @@ -763,7 +711,7 @@ mod test { Multiaddr::from_str("/ip4/127.0.0.1/tcp/7000").unwrap(), Multiaddr::from_str("/ip4/127.0.0.1/tcp/7001").unwrap(), ], - peers: vec![ + peers: BTreeMap::from([ ( PeerId::random(), Multiaddr::from_str("/ip4/127.0.0.1/tcp/7000").unwrap(), @@ -772,7 +720,7 @@ mod test { PeerId::random(), Multiaddr::from_str("/ip4/127.0.0.1/tcp/7001").unwrap(), ), - ], + ]), peers_vec_addr: BTreeMap::from([ ( PeerId::random(), @@ -861,10 +809,10 @@ mod test { assert_eq!(n.timestamp, timestamp); for peer in n.peers { - assert!(peers.contains(&( - PeerId::from_str(&peer.0).unwrap(), - Multiaddr::from_str(&peer.1).unwrap() - ))) + assert_eq!( + Multiaddr::from_str(&peer.1).unwrap(), + peers[&PeerId::from_str(&peer.0).unwrap()] + ) } } NetworkNotification::DiscoveredRendezvous(n) => { diff --git a/homestar-runtime/src/event_handler/swarm_event.rs b/homestar-runtime/src/event_handler/swarm_event.rs index e8ae5303..836893de 100644 --- a/homestar-runtime/src/event_handler/swarm_event.rs +++ b/homestar-runtime/src/event_handler/swarm_event.rs @@ -312,17 +312,15 @@ async fn handle_swarm_event( NetworkNotification::DiscoveredRendezvous( notification::DiscoveredRendezvous::new( rendezvous_node, - BTreeMap::from( - registrations - .iter() - .map(|registration| { - ( - registration.record.peer_id(), - registration.record.addresses().to_owned(), - ) - }) - .collect::>>(), - ), + registrations + .iter() + .map(|registration| { + ( + registration.record.peer_id(), + registration.record.addresses().to_owned(), + ) + }) + .collect::>>(), ), ), ); @@ -1101,7 +1099,11 @@ async fn handle_swarm_event( #[cfg(feature = "websocket-notify")] notification::emit_network_event( event_handler.ws_evt_sender(), - NetworkNotification::DiscoveredMdns(notification::DiscoveredMdns::new(list)), + NetworkNotification::DiscoveredMdns(notification::DiscoveredMdns::new( + list.iter() + .map(|peer| (peer.0, peer.1.to_owned())) + .collect::>(), + )), ) } SwarmEvent::Behaviour(ComposedEvent::Mdns(mdns::Event::Expired(list))) => { diff --git a/homestar-runtime/tests/network/rendezvous.rs b/homestar-runtime/tests/network/rendezvous.rs index 37551660..bfcf9580 100644 --- a/homestar-runtime/tests/network/rendezvous.rs +++ b/homestar-runtime/tests/network/rendezvous.rs @@ -18,7 +18,6 @@ use std::{ net::Ipv4Addr, path::PathBuf, process::{Command, Stdio}, - thread, time::Duration, }; @@ -463,12 +462,6 @@ fn test_libp2p_disconnect_rendezvous_discovery_integration() -> Result<()> { .await .unwrap(); - // Wait for registration to complete. - // TODO When we have WebSocket push events, listen on a registration event instead of using an arbitrary sleep. - // thread::sleep(Duration::from_secs(2)); - - // TODO Wait for clint 1 to register with server, server confirm registration - // Poll for client one registered with server loop { if let Ok(msg) = sub2.next().with_timeout(Duration::from_secs(30)).await { @@ -733,37 +726,102 @@ fn test_libp2p_rendezvous_renew_registration_integration() -> Result<()> { panic!("Homestar server/runtime failed to start in time"); } - // TODO Listen for client registered and server registered peer messages - // with renewal should be more than one. + tokio_test::task::spawn(async { + // Subscribe to rendezvous server + let ws_url1 = format!("ws://{}:{}", Ipv4Addr::LOCALHOST, ws_port1); + let client1 = WsClientBuilder::default().build(ws_url1).await.unwrap(); + let mut sub1: Subscription> = client1 + .subscribe( + SUBSCRIBE_NETWORK_EVENTS_ENDPOINT, + rpc_params![], + UNSUBSCRIBE_NETWORK_EVENTS_ENDPOINT, + ) + .await + .unwrap(); - // Collect logs for five seconds then kill proceses. - let dead_server = kill_homestar(rendezvous_server, Some(Duration::from_secs(5))); - let dead_client = kill_homestar(rendezvous_client1, Some(Duration::from_secs(5))); + // Subscribe to rendezvous client + let ws_url2 = format!("ws://{}:{}", Ipv4Addr::LOCALHOST, ws_port2); + let client2 = WsClientBuilder::default().build(ws_url2).await.unwrap(); + let mut sub2: Subscription> = client2 + .subscribe( + SUBSCRIBE_NETWORK_EVENTS_ENDPOINT, + rpc_params![], + UNSUBSCRIBE_NETWORK_EVENTS_ENDPOINT, + ) + .await + .unwrap(); - // Retrieve logs. - let stdout_server = retrieve_output(dead_server); - let stdout_client = retrieve_output(dead_client); + // Poll for server registered client twice. + let mut peer_registered_count = 0; + loop { + if let Ok(msg) = sub1.next().with_timeout(Duration::from_secs(30)).await { + let json: serde_json::Value = + serde_json::from_slice(&msg.unwrap().unwrap()).unwrap(); - // Count registrations on the server - let server_registration_count = count_lines_where( - stdout_server, - vec![ - "registered peer through rendezvous", - "12D3KooWJWoaqZhDaoEFshF7Rh1bpY9ohihFhzcW6d69Lr2NASuq", - ], - ); + if json["peer_registered_rendezvous"].is_object() + && json["peer_registered_rendezvous"]["peer_id"] == ED25519MULTIHASH3 + { + peer_registered_count += 1; + } + } else { + panic!("Server did not register client twice in time"); + } - // Count registrations on the client - let client_registration_count = count_lines_where( - stdout_client, - vec![ - "registered self with rendezvous node", - "12D3KooWDpJ7As7BWAwRMfu1VU2WCqNjvq387JEYKDBj4kx6nXTN", - ], - ); + if peer_registered_count == 2 { + break; + } + } - assert!(server_registration_count > 1); - assert!(client_registration_count > 1); + // Poll for client registered with server twice. + let mut registered_count = 0; + loop { + if let Ok(msg) = sub2.next().with_timeout(Duration::from_secs(30)).await { + let json: serde_json::Value = + serde_json::from_slice(&msg.unwrap().unwrap()).unwrap(); + + if json["registered_rendezvous"].is_object() + && json["registered_rendezvous"]["server"] == ED25519MULTIHASH + { + registered_count += 1; + } + } else { + panic!("Client did not register with server twice in time"); + } + + if registered_count == 2 { + break; + } + } + + // Collect logs for five seconds then kill proceses. + let dead_server = kill_homestar(rendezvous_server, None); + let dead_client = kill_homestar(rendezvous_client1, None); + + // Retrieve logs. + let stdout_server = retrieve_output(dead_server); + let stdout_client = retrieve_output(dead_client); + + // Count registrations on the server + let server_registration_count = count_lines_where( + stdout_server, + vec![ + "registered peer through rendezvous", + "12D3KooWJWoaqZhDaoEFshF7Rh1bpY9ohihFhzcW6d69Lr2NASuq", + ], + ); + + // Count registrations on the client + let client_registration_count = count_lines_where( + stdout_client, + vec![ + "registered self with rendezvous node", + "12D3KooWDpJ7As7BWAwRMfu1VU2WCqNjvq387JEYKDBj4kx6nXTN", + ], + ); + + assert!(server_registration_count > 1); + assert!(client_registration_count > 1); + }); Ok(()) } @@ -869,37 +927,102 @@ fn test_libp2p_rendezvous_rediscovery_integration() -> Result<()> { panic!("Homestar server/runtime failed to start in time"); } - // TODO Listen for client discover and server discover served messages - // should be more than one for both (or move on at two) + tokio_test::task::spawn(async { + // Subscribe to rendezvous server + let ws_url1 = format!("ws://{}:{}", Ipv4Addr::LOCALHOST, ws_port1); + let client1 = WsClientBuilder::default().build(ws_url1).await.unwrap(); + let mut sub1: Subscription> = client1 + .subscribe( + SUBSCRIBE_NETWORK_EVENTS_ENDPOINT, + rpc_params![], + UNSUBSCRIBE_NETWORK_EVENTS_ENDPOINT, + ) + .await + .unwrap(); + + // Subscribe to rendezvous client + let ws_url2 = format!("ws://{}:{}", Ipv4Addr::LOCALHOST, ws_port2); + let client2 = WsClientBuilder::default().build(ws_url2).await.unwrap(); + let mut sub2: Subscription> = client2 + .subscribe( + SUBSCRIBE_NETWORK_EVENTS_ENDPOINT, + rpc_params![], + UNSUBSCRIBE_NETWORK_EVENTS_ENDPOINT, + ) + .await + .unwrap(); - // Collect logs for five seconds then kill proceses. - let dead_server = kill_homestar(proc_guard_server.take(), Some(Duration::from_secs(15))); - let dead_client = kill_homestar(proc_guard_client1.take(), Some(Duration::from_secs(15))); + // Poll for server provided discovery twice twice + let mut discover_served_count = 0; + loop { + if let Ok(msg) = sub1.next().with_timeout(Duration::from_secs(30)).await { + let json: serde_json::Value = + serde_json::from_slice(&msg.unwrap().unwrap()).unwrap(); - // Retrieve logs. - let stdout_server = retrieve_output(dead_server); - let stdout_client = retrieve_output(dead_client); + if json["discover_served_rendezvous"].is_object() + && json["discover_served_rendezvous"]["enquirer"] == ED25519MULTIHASH4 + { + discover_served_count += 1; + } + } else { + panic!("Server did not provide discovery twice in time"); + } - // Count discover requests on the server - let server_discovery_count = count_lines_where( - stdout_server, - vec![ - "served rendezvous discover request to peer", - ED25519MULTIHASH4, - ], - ); + if discover_served_count == 2 { + break; + } + } - // Count discovery responses the client - let client_discovery_count = count_lines_where( - stdout_client, - vec![ - "received discovery from rendezvous server", - ED25519MULTIHASH, - ], - ); + // Poll for client discovered twice + let mut discovered_count = 0; + loop { + if let Ok(msg) = sub2.next().with_timeout(Duration::from_secs(30)).await { + let json: serde_json::Value = + serde_json::from_slice(&msg.unwrap().unwrap()).unwrap(); - assert!(server_discovery_count > 1); - assert!(client_discovery_count > 1); + if json["discovered_rendezvous"].is_object() + && json["discovered_rendezvous"]["server"] == ED25519MULTIHASH + { + discovered_count += 1; + } + } else { + panic!("Client did not discover twice in time"); + } + + if discovered_count == 2 { + break; + } + } + + // Collect logs for five seconds then kill proceses. + let dead_server = kill_homestar(proc_guard_server.take(), None); + let dead_client = kill_homestar(proc_guard_client1.take(), None); + + // Retrieve logs. + let stdout_server = retrieve_output(dead_server); + let stdout_client = retrieve_output(dead_client); + + // Count discover requests on the server + let server_discovery_count = count_lines_where( + stdout_server, + vec![ + "served rendezvous discover request to peer", + ED25519MULTIHASH4, + ], + ); + + // Count discovery responses the client + let client_discovery_count = count_lines_where( + stdout_client, + vec![ + "received discovery from rendezvous server", + ED25519MULTIHASH, + ], + ); + + assert!(server_discovery_count > 1); + assert!(client_discovery_count > 1); + }); Ok(()) } @@ -1011,16 +1134,41 @@ fn test_libp2p_rendezvous_rediscover_on_expiration_integration() -> Result<()> { panic!("Homestar server/runtime failed to start in time"); } - // Wait for registration to complete. - // TODO When we have WebSocket push events, listen on a registration event instead of using an arbitrary sleep. - thread::sleep(Duration::from_secs(2)); + tokio_test::task::spawn(async { + // Subscribe to rendezvous client one + let ws_url2 = format!("ws://{}:{}", Ipv4Addr::LOCALHOST, ws_port2); + let client2 = WsClientBuilder::default().build(ws_url2).await.unwrap(); + let mut sub2: Subscription> = client2 + .subscribe( + SUBSCRIBE_NETWORK_EVENTS_ENDPOINT, + rpc_params![], + UNSUBSCRIBE_NETWORK_EVENTS_ENDPOINT, + ) + .await + .unwrap(); - // Start a peer that will discover with the rendezvous server when - // a discovered registration expires. Note that by default discovery only - // occurs every ten minutes, so discovery requests in this test are driven - // by expirations. - let toml3 = format!( - r#" + // Poll for client one registered with server the first time + loop { + if let Ok(msg) = sub2.next().with_timeout(Duration::from_secs(30)).await { + let json: serde_json::Value = + serde_json::from_slice(&msg.unwrap().unwrap()).unwrap(); + + if json["registered_rendezvous"].is_object() + && json["registered_rendezvous"]["server"] == ED25519MULTIHASH + { + break; + } + } else { + panic!("Client did not register with server twice in time"); + } + } + + // Start a peer that will discover with the rendezvous server when + // a discovered registration expires. Note that by default discovery only + // occurs every ten minutes, so discovery requests in this test are driven + // by client one expirations. + let toml3 = format!( + r#" [node] [node.network.keypair_config] existing = {{ key_type = "ed25519", path = "./fixtures/__testkey_ed25519_2.pem" }} @@ -1036,10 +1184,10 @@ fn test_libp2p_rendezvous_rediscover_on_expiration_integration() -> Result<()> { [node.network.webserver] port = {ws_port3} "# - ); - let config3 = make_config!(toml3); + ); + let config3 = make_config!(toml3); - let rendezvous_client2 = Command::new(BIN.as_os_str()) + let rendezvous_client2 = Command::new(BIN.as_os_str()) .env( "RUST_LOG", "homestar=debug,homestar_runtime=debug,libp2p=debug,libp2p_gossipsub::behaviour=debug", @@ -1052,43 +1200,77 @@ fn test_libp2p_rendezvous_rediscover_on_expiration_integration() -> Result<()> { .stdout(Stdio::piped()) .spawn() .unwrap(); - let proc_guard_client2 = ChildGuard::new(rendezvous_client2); + let proc_guard_client2 = ChildGuard::new(rendezvous_client2); - if wait_for_socket_connection(ws_port3, 1000).is_err() { - panic!("Homestar server/runtime failed to start in time"); - } + if wait_for_socket_connection(ws_port3, 1000).is_err() { + panic!("Homestar server/runtime failed to start in time"); + } - // Collect logs for seven seconds then kill proceses. - let dead_server = kill_homestar(proc_guard_server.take(), Some(Duration::from_secs(15))); - let _ = kill_homestar(proc_guard_client1.take(), Some(Duration::from_secs(7))); - let dead_client2 = kill_homestar(proc_guard_client2.take(), Some(Duration::from_secs(15))); - - // Retrieve logs. - let stdout_server = retrieve_output(dead_server); - let stdout_client2 = retrieve_output(dead_client2); - - // Count discover requests on the server - let server_discovery_count = count_lines_where( - stdout_server, - vec![ - "served rendezvous discover request to peer", - "12D3KooWK99VoVxNE7XzyBwXEzW7xhK7Gpv85r9F3V3fyKSUKPH5", - ], - ); + // Subscribe to rendezvous client two + let ws_url3 = format!("ws://{}:{}", Ipv4Addr::LOCALHOST, ws_port3); + let client3 = WsClientBuilder::default().build(ws_url3).await.unwrap(); + let mut sub3: Subscription> = client3 + .subscribe( + SUBSCRIBE_NETWORK_EVENTS_ENDPOINT, + rpc_params![], + UNSUBSCRIBE_NETWORK_EVENTS_ENDPOINT, + ) + .await + .unwrap(); - println!("server_discovery_count: {}", server_discovery_count); + // Poll for client two discovered twice + let mut discovered_count = 0; + loop { + if let Ok(msg) = sub3.next().with_timeout(Duration::from_secs(30)).await { + let json: serde_json::Value = + serde_json::from_slice(&msg.unwrap().unwrap()).unwrap(); - // Count discovery responses the client - let client_discovery_count = count_lines_where( - stdout_client2, - vec![ - "received discovery from rendezvous server", - "12D3KooWDpJ7As7BWAwRMfu1VU2WCqNjvq387JEYKDBj4kx6nXTN", - ], - ); + if json["discovered_rendezvous"].is_object() + && json["discovered_rendezvous"]["server"] == ED25519MULTIHASH + { + discovered_count += 1; + } + } else { + panic!("Client did not discover twice in time"); + } - assert!(server_discovery_count > 1); - assert!(client_discovery_count > 1); + if discovered_count == 2 { + break; + } + } + + // Collect logs for seven seconds then kill proceses. + let dead_server = kill_homestar(proc_guard_server.take(), None); + let _ = kill_homestar(proc_guard_client1.take(), None); + let dead_client2 = kill_homestar(proc_guard_client2.take(), None); + + // Retrieve logs. + let stdout_server = retrieve_output(dead_server); + let stdout_client2 = retrieve_output(dead_client2); + + // Count discover requests on the server + let server_discovery_count = count_lines_where( + stdout_server, + vec![ + "served rendezvous discover request to peer", + "12D3KooWK99VoVxNE7XzyBwXEzW7xhK7Gpv85r9F3V3fyKSUKPH5", + ], + ); + + println!("server_discovery_count: {}", server_discovery_count); + + // Count discovery responses the client + let client_discovery_count = count_lines_where( + stdout_client2, + vec![ + "received discovery from rendezvous server", + "12D3KooWDpJ7As7BWAwRMfu1VU2WCqNjvq387JEYKDBj4kx6nXTN", + ], + ); + + assert!(server_discovery_count > 1); + assert!(client_discovery_count > 1); + }); Ok(()) }