diff --git a/.unreleased/LLT-5515-fix-meshnet-connection-monitoring b/.unreleased/LLT-5515-fix-meshnet-connection-monitoring new file mode 100644 index 000000000..ddb9b024a --- /dev/null +++ b/.unreleased/LLT-5515-fix-meshnet-connection-monitoring @@ -0,0 +1 @@ +Fix incorrect entries in nat_traversal_conn_info diff --git a/Cargo.lock b/Cargo.lock index 8223205fd..392a58e77 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4642,7 +4642,6 @@ dependencies = [ "telio-batcher", "telio-crypto", "telio-model", - "telio-nurse", "telio-proto", "telio-sockets", "telio-task", diff --git a/Cargo.toml b/Cargo.toml index 957847933..b808a36b2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -84,6 +84,7 @@ tokio = { workspace = true, features = ["test-util"] } telio-dns = { workspace = true, features = ["mockall"] } telio-firewall = { workspace = true, features = ["mockall"] } +telio-nurse = { workspace = true, features = ["mockall"] } telio-proxy = { workspace = true, features = ["mockall"] } telio-batcher = { workspace = true } telio-pq = { workspace = true, features = ["mockall"] } @@ -129,6 +130,7 @@ libc = "0.2.112" tracing = { version = "0.1.37", features = ["max_level_trace", "release_max_level_debug"] } maplit = "1" mockall = "0.11.3" +mockall_double = "0.3.1" modifier = "0.1.0" nat-detect = { git = "https://github.com/NordSecurity/nat-detect.git", tag = "v0.1.8" } ntest = "0.7" diff --git a/crates/telio-nurse/Cargo.toml b/crates/telio-nurse/Cargo.toml index 4d4a904e9..ef7f7d4d0 100644 --- a/crates/telio-nurse/Cargo.toml +++ b/crates/telio-nurse/Cargo.toml @@ -16,6 +16,7 @@ async-trait.workspace = true crypto_box.workspace = true futures.workspace = true mockall = { workspace = true, optional = true } +mockall_double.workspace = true nat-detect.workspace = true rand.workspace = true serde.workspace = true @@ -37,8 +38,6 @@ telio-utils.workspace = true telio-wg.workspace = true once_cell.workspace = true -mockall_double = "0.3.1" - [dev-dependencies] telio-sockets.workspace = true telio-wg = { workspace = true, features = ["mockall"] } diff --git a/crates/telio-nurse/src/aggregator.rs b/crates/telio-nurse/src/aggregator.rs index dc00a07ea..d53d746a4 100644 --- a/crates/telio-nurse/src/aggregator.rs +++ b/crates/telio-nurse/src/aggregator.rs @@ -11,7 +11,7 @@ use telio_model::{ features::EndpointProvider, HashMap, }; -use telio_utils::{telio_log_debug, telio_log_warn}; +use telio_utils::{telio_log_debug, telio_log_info, telio_log_warn}; use telio_wg::{ uapi::{AnalyticsEvent, PeerState}, WireGuard, @@ -159,8 +159,8 @@ impl PeerConnDataSegment { ) -> Self { PeerConnDataSegment { node: peer_event.public_key, - start: peer_event.timestamp.into(), - duration: target_timestamp.duration_since(peer_event.timestamp.into()), + start: peer_event.timestamp, + duration: target_timestamp.duration_since(peer_event.timestamp), connection_data: PeerConnectionData { endpoints, rx_bytes: target_rx_bytes - peer_event.rx_bytes, @@ -225,13 +225,13 @@ impl ConnectivityDataAggregator { /// * `event` - Analytic event with the current direct peer state. /// * `local_ep` - Endpoint provider used by the direct connection on this node. /// * `remote_ep` - Endpoint provider used by the direct connection on remote node. - pub async fn change_peer_state_direct( + pub async fn report_peer_state_direct( &self, event: &AnalyticsEvent, local_ep: EndpointProvider, remote_ep: EndpointProvider, ) { - self.change_peer_state_common( + self.report_peer_state_common( event, PeerEndpointTypes { local_ep: local_ep.into(), @@ -246,8 +246,8 @@ impl ConnectivityDataAggregator { /// # Arguments /// /// * `event` - Analytic event with the current relayed peer state. - pub async fn change_peer_state_relayed(&self, event: &AnalyticsEvent) { - self.change_peer_state_common( + pub async fn report_peer_state_relayed(&self, event: &AnalyticsEvent) { + self.report_peer_state_common( event, PeerEndpointTypes { local_ep: EndpointType::Relay, @@ -257,7 +257,25 @@ impl ConnectivityDataAggregator { .await } - async fn change_peer_state_common(&self, event: &AnalyticsEvent, endpoints: PeerEndpointTypes) { + fn log_new_state(event: &AnalyticsEvent, endpoints: &PeerEndpointTypes) { + if endpoints.local_ep == EndpointType::Relay && endpoints.remote_ep == EndpointType::Relay { + telio_log_info!( + "Relayed peer state change for {:?} to {:?} will be reported", + event.public_key, + event.peer_state + ); + } else { + telio_log_info!( + "Direct peer state change for {:?} to {:?} ({:?} -> {:?}) will be reported", + event.public_key, + event.peer_state, + endpoints.local_ep, + endpoints.remote_ep, + ); + } + } + + async fn report_peer_state_common(&self, event: &AnalyticsEvent, endpoints: PeerEndpointTypes) { if !self.config.nat_traversal_events { return; } @@ -283,22 +301,6 @@ impl ConnectivityDataAggregator { return; } - if endpoints.local_ep == EndpointType::Relay && endpoints.remote_ep == EndpointType::Relay { - telio_log_debug!( - "Relayed peer state change for {} to {:?} will be reported", - event.public_key, - event.peer_state - ); - } else { - telio_log_debug!( - "Direct peer state change for {} to {:?} ({:?} -> {:?}) will be reported", - event.public_key, - event.peer_state, - endpoints.local_ep, - endpoints.remote_ep, - ); - } - let new_segment = match data_guard.current_peer_events.entry(event.public_key) { Entry::Occupied(mut entry) if entry.get().0.peer_state != event.peer_state || entry.get().1 != endpoints => @@ -306,7 +308,7 @@ impl ConnectivityDataAggregator { let new_segment = PeerConnDataSegment::new( &entry.get().0, entry.get().1, - event.timestamp.into(), + event.timestamp, event.rx_bytes, event.tx_bytes, ); @@ -315,12 +317,14 @@ impl ConnectivityDataAggregator { entry.remove(); } else { entry.insert((event.clone(), endpoints)); + Self::log_new_state(event, &endpoints); } Some(new_segment) } Entry::Vacant(entry) if event.peer_state == PeerState::Connected => { entry.insert((event.clone(), endpoints)); + Self::log_new_state(event, &endpoints); None } _ => None, @@ -402,8 +406,7 @@ impl ConnectivityDataAggregator { let mut new_peer_segments = Vec::new(); if let Ok(wg_peers) = wg_interface.map(|wgi| wgi.peers) { for (peer_event, peer_state) in data_guard.current_peer_events.values_mut() { - let since_last_event = - current_timestamp.duration_since(Instant::from_std(peer_event.timestamp)); + let since_last_event = current_timestamp.duration_since(peer_event.timestamp); if since_last_event > self.config.state_duration_cap || force_save { wg_peers .get(&peer_event.public_key) @@ -422,7 +425,7 @@ impl ConnectivityDataAggregator { )); peer_event.rx_bytes = rx_bytes; peer_event.tx_bytes = tx_bytes; - peer_event.timestamp = current_timestamp.into(); + peer_event.timestamp = current_timestamp; }, ); } @@ -756,7 +759,7 @@ mod tests { // Initial event let mut current_peer_event = env.create_basic_peer_event(KeyKind::Losing); env.connectivity_data_aggregator - .change_peer_state_direct( + .report_peer_state_direct( ¤t_peer_event, EndpointProvider::Upnp, EndpointProvider::Local, @@ -769,7 +772,7 @@ mod tests { current_peer_event.rx_bytes += 1000; current_peer_event.tx_bytes += 2000; env.connectivity_data_aggregator - .change_peer_state_relayed(¤t_peer_event) + .report_peer_state_relayed(¤t_peer_event) .await; let segments = env @@ -789,7 +792,7 @@ mod tests { let mut current_peer_event = env.create_basic_peer_event(KeyKind::Losing); let segment_start = current_peer_event.timestamp; env.connectivity_data_aggregator - .change_peer_state_direct( + .report_peer_state_direct( ¤t_peer_event, EndpointProvider::Upnp, EndpointProvider::Local, @@ -801,7 +804,7 @@ mod tests { current_peer_event.rx_bytes += 1000; current_peer_event.tx_bytes += 2000; env.connectivity_data_aggregator - .change_peer_state_direct( + .report_peer_state_direct( ¤t_peer_event, EndpointProvider::Upnp, EndpointProvider::Local, @@ -822,7 +825,7 @@ mod tests { current_peer_event.rx_bytes += 1000; current_peer_event.tx_bytes += 2000; env.connectivity_data_aggregator - .change_peer_state_relayed(¤t_peer_event) + .report_peer_state_relayed(¤t_peer_event) .await; // And after 60 seconds connect again @@ -831,7 +834,7 @@ mod tests { current_peer_event.rx_bytes += 1000; current_peer_event.tx_bytes += 2000; env.connectivity_data_aggregator - .change_peer_state_relayed(¤t_peer_event) + .report_peer_state_relayed(¤t_peer_event) .await; // We don't gather periods of time when peer is disconnected, so we should have here only one segment @@ -867,7 +870,7 @@ mod tests { let mut current_peer_event = env.create_basic_peer_event(KeyKind::Losing); let first_segment_start = current_peer_event.timestamp; env.connectivity_data_aggregator - .change_peer_state_direct( + .report_peer_state_direct( ¤t_peer_event, EndpointProvider::Upnp, EndpointProvider::Local, @@ -880,7 +883,7 @@ mod tests { current_peer_event.rx_bytes += 1000; current_peer_event.tx_bytes += 2000; env.connectivity_data_aggregator - .change_peer_state_direct( + .report_peer_state_direct( ¤t_peer_event, EndpointProvider::Upnp, EndpointProvider::Upnp, @@ -892,7 +895,7 @@ mod tests { current_peer_event.rx_bytes += 2000; current_peer_event.tx_bytes += 4000; env.connectivity_data_aggregator - .change_peer_state_direct( + .report_peer_state_direct( ¤t_peer_event, EndpointProvider::Local, EndpointProvider::Upnp, @@ -958,7 +961,7 @@ mod tests { // Insert the first event env.connectivity_data_aggregator - .change_peer_state_relayed(¤t_peer_event) + .report_peer_state_relayed(¤t_peer_event) .await; let segments = env @@ -1048,7 +1051,7 @@ mod tests { // Insert the first event env.connectivity_data_aggregator - .change_peer_state_relayed(¤t_peer_event) + .report_peer_state_relayed(¤t_peer_event) .await; let segments = env @@ -1204,7 +1207,7 @@ mod tests { let current_peer_event = env.create_basic_peer_event(KeyKind::Losing); env.connectivity_data_aggregator - .change_peer_state_direct( + .report_peer_state_direct( ¤t_peer_event, EndpointProvider::Upnp, EndpointProvider::Stun, @@ -1275,7 +1278,7 @@ mod tests { // Initial event let first_segment_start = current_peer_event.timestamp; env.connectivity_data_aggregator - .change_peer_state_direct( + .report_peer_state_direct( ¤t_peer_event, EndpointProvider::Upnp, EndpointProvider::Local, @@ -1287,7 +1290,7 @@ mod tests { current_peer_event.rx_bytes += 1000; current_peer_event.tx_bytes += 2000; env.connectivity_data_aggregator - .change_peer_state_direct( + .report_peer_state_direct( ¤t_peer_event, EndpointProvider::Upnp, EndpointProvider::Upnp, @@ -1346,7 +1349,7 @@ mod tests { .await; env.connectivity_data_aggregator - .change_peer_state_direct( + .report_peer_state_direct( ¤t_peer_event, EndpointProvider::Local, EndpointProvider::Upnp, @@ -1358,7 +1361,7 @@ mod tests { current_peer_event.rx_bytes += 1000; current_peer_event.tx_bytes += 2000; env.connectivity_data_aggregator - .change_peer_state_relayed(¤t_peer_event) + .report_peer_state_relayed(¤t_peer_event) .await; let segments = env @@ -1467,10 +1470,10 @@ mod tests { event_to_be_ignored.tx_bytes = 0; let segment_start = event_to_be_recorded.timestamp; env.connectivity_data_aggregator - .change_peer_state_relayed(&event_to_be_recorded) + .report_peer_state_relayed(&event_to_be_recorded) .await; env.connectivity_data_aggregator - .change_peer_state_relayed(&event_to_be_ignored) + .report_peer_state_relayed(&event_to_be_ignored) .await; event_to_be_recorded.timestamp = segment_start + Duration::from_secs(30); @@ -1478,14 +1481,14 @@ mod tests { event_to_be_ignored.timestamp = segment_start + Duration::from_secs(30); event_to_be_ignored.tx_bytes = 1000; env.connectivity_data_aggregator - .change_peer_state_direct( + .report_peer_state_direct( &event_to_be_recorded, EndpointProvider::Stun, EndpointProvider::Stun, ) .await; env.connectivity_data_aggregator - .change_peer_state_direct( + .report_peer_state_direct( &event_to_be_ignored, EndpointProvider::Stun, EndpointProvider::Stun, @@ -1517,13 +1520,13 @@ mod tests { current_peer_event.tx_bytes = 0; let start_timestamp = current_peer_event.timestamp; env.connectivity_data_aggregator - .change_peer_state_relayed(¤t_peer_event) + .report_peer_state_relayed(¤t_peer_event) .await; current_peer_event.timestamp = start_timestamp + Duration::from_secs(30); current_peer_event.tx_bytes = 1000; env.connectivity_data_aggregator - .change_peer_state_direct( + .report_peer_state_direct( ¤t_peer_event, EndpointProvider::Stun, EndpointProvider::Stun, @@ -1533,13 +1536,13 @@ mod tests { current_peer_event.timestamp = start_timestamp + Duration::from_secs(30 + 31); current_peer_event.tx_bytes = 1000 + 1001; env.connectivity_data_aggregator - .change_peer_state_relayed(¤t_peer_event) + .report_peer_state_relayed(¤t_peer_event) .await; current_peer_event.timestamp = start_timestamp + Duration::from_secs(30 + 31 + 32); current_peer_event.tx_bytes = 1000 + 1001 + 1002; env.connectivity_data_aggregator - .change_peer_state_direct( + .report_peer_state_direct( ¤t_peer_event, EndpointProvider::Local, EndpointProvider::Local, @@ -1549,14 +1552,14 @@ mod tests { current_peer_event.timestamp = start_timestamp + Duration::from_secs(30 + 31 + 32 + 33); current_peer_event.tx_bytes = 1000 + 1001 + 1002 + 1003; env.connectivity_data_aggregator - .change_peer_state_relayed(¤t_peer_event) + .report_peer_state_relayed(¤t_peer_event) .await; current_peer_event.timestamp = start_timestamp + Duration::from_secs(30 + 31 + 32 + 33 + 34); current_peer_event.tx_bytes = 1000 + 1001 + 1002 + 1003 + 1004; env.connectivity_data_aggregator - .change_peer_state_direct( + .report_peer_state_direct( ¤t_peer_event, EndpointProvider::Upnp, EndpointProvider::Upnp, @@ -1622,7 +1625,7 @@ mod tests { }]; env.connectivity_data_aggregator - .change_peer_state_direct( + .report_peer_state_direct( ¤t_peer_event, EndpointProvider::Upnp, EndpointProvider::Local, @@ -1635,7 +1638,7 @@ mod tests { current_peer_event.tx_bytes += 2000; current_peer_event.peer_state = NodeState::Disconnected; env.connectivity_data_aggregator - .change_peer_state_direct( + .report_peer_state_direct( ¤t_peer_event, EndpointProvider::Upnp, EndpointProvider::Local, diff --git a/crates/telio-nurse/src/heartbeat.rs b/crates/telio-nurse/src/heartbeat.rs index 66e04a037..f13ea9050 100644 --- a/crates/telio-nurse/src/heartbeat.rs +++ b/crates/telio-nurse/src/heartbeat.rs @@ -1056,7 +1056,7 @@ impl Analytics { internal_sorted_fingerprints.push(fp); } - sorted_index = i; + sorted_index += 1; } // Add external nodes to the index map (needed for aggregator to work properly) diff --git a/crates/telio-nurse/src/qos.rs b/crates/telio-nurse/src/qos.rs index 215dafb5d..0073196af 100644 --- a/crates/telio-nurse/src/qos.rs +++ b/crates/telio-nurse/src/qos.rs @@ -3,11 +3,10 @@ use histogram::Histogram; use std::collections::{BTreeSet, HashMap, HashSet}; use std::net::IpAddr; use std::sync::Arc; -use std::time::Instant; use telio_model::constants::{VPN_INTERNAL_IPV4, VPN_INTERNAL_IPV6}; use tokio::sync::mpsc; -use tokio::time::{Duration, Interval}; +use tokio::time::{Duration, Instant, Interval}; use telio_crypto::PublicKey; use telio_model::features::RttType; diff --git a/crates/telio-relay/Cargo.toml b/crates/telio-relay/Cargo.toml index 7b37729d9..dd49e51e6 100644 --- a/crates/telio-relay/Cargo.toml +++ b/crates/telio-relay/Cargo.toml @@ -8,7 +8,6 @@ publish = false [dependencies] generic-array = "0.14.5" -mockall_double = "0.3.1" rand_core = { version = "0.6.3", default-features = false } rustls-pemfile = "1.0.0" tokio-rustls = { version = "0.24.1", default-features = false } @@ -22,6 +21,7 @@ crypto_box.workspace = true futures.workspace = true httparse.workspace = true libc.workspace = true +mockall_double.workspace = true num_enum.workspace = true rand.workspace = true serde.workspace = true diff --git a/crates/telio-traversal/Cargo.toml b/crates/telio-traversal/Cargo.toml index a1cd7abde..8f93266e8 100644 --- a/crates/telio-traversal/Cargo.toml +++ b/crates/telio-traversal/Cargo.toml @@ -35,7 +35,6 @@ tokio = { workspace = true, features = ["full"] } telio-batcher.workspace = true telio-crypto.workspace = true telio-model.workspace = true -telio-nurse.workspace = true telio-proto.workspace = true telio-sockets.workspace = true telio-task.workspace = true diff --git a/crates/telio-traversal/src/upgrade_sync.rs b/crates/telio-traversal/src/upgrade_sync.rs index d8e4c2655..09f61c85b 100644 --- a/crates/telio-traversal/src/upgrade_sync.rs +++ b/crates/telio-traversal/src/upgrade_sync.rs @@ -7,12 +7,9 @@ use std::time::Duration; use std::{collections::HashMap, sync::Arc}; use telio_crypto::{smaller_key_in_meshnet_canonical_order, PublicKey}; use telio_model::features::EndpointProvider; -use telio_nurse::aggregator::ConnectivityDataAggregator; use telio_proto::{Decision, Session, UpgradeDecisionMsg, UpgradeMsg}; use telio_task::{io::chan, io::Chan, task_exec, BoxAction, Runtime, Task}; use telio_utils::{interval, telio_log_debug, telio_log_info, telio_log_warn, LruCache}; -use telio_wg::uapi::{AnalyticsEvent, PeerState}; -use telio_wg::WireGuard; use tokio::{ sync::mpsc::error::SendError, time::{Instant, Interval}, @@ -81,15 +78,18 @@ pub trait UpgradeSyncTrait { local_direct_endpoint: (SocketAddr, EndpointProvider), session: Session, ) -> Result; + async fn get_accepted_session(&self, public_key: PublicKey) -> Option; + async fn clear_accepted_session(&self, public_key: PublicKey); } pub struct UpgradeSync { task: Task, } -struct PendingSessionData { - local_ep: EndpointProvider, - remote_ep: EndpointProvider, +#[derive(Clone)] +pub struct SessionData { + pub local_ep: EndpointProvider, + pub remote_ep: EndpointProvider, } pub struct State { @@ -101,10 +101,9 @@ pub struct State { poll_timer: Interval, upgrade_decision_intercoms: Chan<(PublicKey, UpgradeDecisionMsg)>, // Collection of not yet confirmed Upgrade requests - pending_direct_sessions: LruCache, + pending_direct_sessions: LruCache, + accepted_direct_sessions: HashMap, our_public_key: PublicKey, - connectivity_data_aggregator: Arc, - wireguard: Arc, } impl UpgradeSync { @@ -116,8 +115,6 @@ impl UpgradeSync { upgrade_verifier: Arc, upgrade_decision_intercoms: Chan<(PublicKey, UpgradeDecisionMsg)>, our_public_key: PublicKey, - connectivity_data_aggregator: Arc, - wireguard: Arc, ) -> Result { telio_log_info!("Starting Upgrade sync module"); let poll_timer = interval(expiration_period / 2); @@ -134,9 +131,8 @@ impl UpgradeSync { Duration::from_secs(15), MAX_PENDING_SESSIONS, ), + accepted_direct_sessions: Default::default(), our_public_key, - connectivity_data_aggregator, - wireguard, }), }) } @@ -189,6 +185,22 @@ impl UpgradeSyncTrait for UpgradeSync { .await .map_err(Error::Task) } + + async fn get_accepted_session(&self, public_key: PublicKey) -> Option { + task_exec!(&self.task, async move |s| { + Ok(s.accepted_direct_sessions.get(&public_key).cloned()) + }) + .await + .unwrap_or_default() + } + + async fn clear_accepted_session(&self, public_key: PublicKey) { + let _ = task_exec!(&self.task, async move |s| { + s.accepted_direct_sessions.remove(&public_key); + Ok(()) + }) + .await; + } } impl State { @@ -246,7 +258,7 @@ impl State { .insert(Direction::Sent(*public_key), val); self.pending_direct_sessions.insert( session, - PendingSessionData { + SessionData { local_ep: local_direct_endpoint.1, remote_ep: remote_endpoint.1, }, @@ -319,13 +331,13 @@ impl State { return Err(Error::Rejected(decision)); } - if let Some(event) = self.make_nat_event(*public_key).await { - let local_ep = upgrade_msg.receiver_endpoint_type; - let remote_ep = upgrade_msg.endpoint_type; - self.connectivity_data_aggregator - .change_peer_state_direct(&event, local_ep, remote_ep) - .await; - } + self.accepted_direct_sessions.insert( + *public_key, + SessionData { + local_ep: upgrade_msg.receiver_endpoint_type, + remote_ep: upgrade_msg.endpoint_type, + }, + ); } Ok(false) => { // We might have restarted in the middle of upgrade procedure and have @@ -424,27 +436,6 @@ impl State { Ok(()) } - async fn make_nat_event(&self, public_key: PublicKey) -> Option { - if let Some(peer) = self - .wireguard - .get_interface() - .await - .ok() - .and_then(|w| w.peers.get(&public_key).cloned()) - { - Some(AnalyticsEvent { - public_key, - dual_ip_addresses: peer.get_dual_ip_addresses(), - tx_bytes: peer.rx_bytes.unwrap_or_default(), - rx_bytes: peer.tx_bytes.unwrap_or_default(), - peer_state: PeerState::Connected, - timestamp: Instant::now().into_std(), - }) - } else { - None - } - } - pub fn set_public_key(&mut self, public_key: PublicKey) { self.our_public_key = public_key; } @@ -477,16 +468,10 @@ impl Runtime for State { match msg.decision { Decision::Accepted => { if let Some(data) = self.pending_direct_sessions.remove(&msg.session) { - if let Some(event) = self.make_nat_event(public_key).await { - self.connectivity_data_aggregator. - change_peer_state_direct(&event, data.local_ep, data.remote_ep).await; - } else { - telio_log_warn!("Received upgrade decision from peer {public_key:?} to which we didn't send a request: {msg:?}"); - } + self.accepted_direct_sessions.insert(public_key, SessionData {local_ep: data.local_ep, remote_ep: data.remote_ep}); } else { telio_log_warn!("Received upgrade decision from {public_key:?} for unknow session: {msg:?}"); } - }, Decision::RejectedDueToUnknownSession => { if let Some(pk) = self.upgrade_requests.iter().find(|(_, req)| req.session == msg.session).map(|(pk,_)| *pk) { @@ -519,13 +504,11 @@ impl Runtime for State { #[cfg(test)] mod tests { - use std::{collections::BTreeMap, sync::Arc}; + use std::sync::Arc; use super::*; use telio_crypto::SecretKey; - use telio_nurse::config::AggregatorConfig; use telio_proto::Decision; - use telio_wg::{uapi::Interface, MockWireGuard}; use tokio::{ sync::{ mpsc::{channel, Receiver, Sender}, @@ -591,7 +574,6 @@ mod tests { fn setup( expiry: Duration, upgrade_verifier: Arc, - wg: Arc, ) -> ( UpgradeSync, chan::Rx, @@ -614,12 +596,6 @@ mod tests { upgrade_verifier, upg_decision_us, our_public_key, - Arc::new(ConnectivityDataAggregator::new( - AggregatorConfig::default(), - wg.clone(), - our_public_key, - )), - wg, ) .unwrap(); @@ -631,10 +607,8 @@ mod tests { async fn handle_request_expiration() { const EXPIRY: Duration = Duration::from_millis(100); - let wg = Arc::new(MockWireGuard::new()); - let (upg_sync, mut upg_rq_rx, mut intercoms_them, _) = - setup(EXPIRY, Arc::new(KnowsAllSessions::new()), wg); + setup(EXPIRY, Arc::new(KnowsAllSessions::new())); let upg_msg = UpgradeMsg { endpoint: "127.0.0.1:6666".parse().unwrap(), @@ -688,18 +662,8 @@ mod tests { #[tokio::test] async fn handle_request_and_removal() { const EXPIRY: Duration = Duration::from_millis(100); - let mut wg = MockWireGuard::new(); - wg.expect_get_interface().returning(|| { - Ok(Interface { - private_key: None, - listen_port: None, - proxy_listen_port: None, - fwmark: 42, - peers: BTreeMap::default(), - }) - }); let (upg_sync, mut upg_rq_rx, mut intercoms_them, mut upgrade_decision_them) = - setup(EXPIRY, Arc::new(KnowsAllSessions::new()), Arc::new(wg)); + setup(EXPIRY, Arc::new(KnowsAllSessions::new())); let upg_msg = UpgradeMsg { endpoint: "127.0.0.1:6666".parse().unwrap(), @@ -752,9 +716,8 @@ mod tests { async fn handle_upgrade_rejection_due_to_unknown_session() { const EXPIRY: Duration = Duration::from_millis(100); let upgrade_controller = Arc::new(KnowsAllSessions::new()); - let wg = Arc::new(MockWireGuard::new()); let (upg_sync, _upg_rq_rx, _intercoms_them, upgrade_decision_them) = - setup(EXPIRY, upgrade_controller.clone(), wg); + setup(EXPIRY, upgrade_controller.clone()); let public_key = "REjdn4zY2TFx2AMujoNGPffo9vDiRDXpGG4jHPtx2AY=" .parse::() @@ -798,9 +761,8 @@ mod tests { async fn handle_upgrade_rejection_due_to_concurrent_upgrade() { const EXPIRY: Duration = Duration::from_millis(100); let upgrade_controller = Arc::new(KnowsAllSessions::new()); - let wg = Arc::new(MockWireGuard::new()); let (upg_sync, _upg_rq_rx, _intercoms_them, upgrade_decision_them) = - setup(EXPIRY, upgrade_controller.clone(), wg); + setup(EXPIRY, upgrade_controller.clone()); let public_key = "REjdn4zY2TFx2AMujoNGPffo9vDiRDXpGG4jHPtx2AY=" .parse::() @@ -852,9 +814,8 @@ mod tests { #[tokio::test] async fn unknown_requests_are_rejected() { const EXPIRY: Duration = Duration::from_millis(100); - let wg = Arc::new(MockWireGuard::new()); let (_upg_sync, mut _upg_rq_rx, intercoms_them, mut upgrade_decision_them) = - setup(EXPIRY, Arc::new(KnowsNoSessions), wg); + setup(EXPIRY, Arc::new(KnowsNoSessions)); let public_key = "REjdn4zY2TFx2AMujoNGPffo9vDiRDXpGG4jHPtx2AY=" .parse::() .unwrap(); @@ -880,18 +841,8 @@ mod tests { #[tokio::test] async fn handle_request_during_an_ongoing_upgrade() { const EXPIRY: Duration = Duration::from_millis(100); - let mut wg = MockWireGuard::new(); - wg.expect_get_interface().returning(|| { - Ok(Interface { - private_key: None, - listen_port: None, - proxy_listen_port: None, - fwmark: 42, - peers: BTreeMap::default(), - }) - }); - let (upg_sync, mut upg_rq_rx, mut intercoms_them, mut upgrade_decision_them) = - setup(EXPIRY, Arc::new(KnowsAllSessions::new()), Arc::new(wg)); + let (upg_sync, mut _upg_rq_rx, intercoms_them, mut _upgrade_decision_them) = + setup(EXPIRY, Arc::new(KnowsAllSessions::new())); let upg_msg = UpgradeMsg { endpoint: "127.0.0.1:6666".parse().unwrap(), diff --git a/crates/telio-wg/src/uapi.rs b/crates/telio-wg/src/uapi.rs index 5d5c615f8..524da9dce 100644 --- a/crates/telio-wg/src/uapi.rs +++ b/crates/telio-wg/src/uapi.rs @@ -5,6 +5,7 @@ use serde::{Deserialize, Serialize}; use telio_crypto::{KeyDecodeError, PresharedKey, PublicKey, SecretKey}; use telio_model::mesh::{LinkState, Node, NodeState}; use telio_utils::{telio_log_warn, DualTarget, DualTargetError}; +use tokio::time::Instant; use wireguard_uapi::{get, xplatform::set}; use std::{ @@ -16,7 +17,7 @@ use std::{ panic, str::FromStr, sync::Arc, - time::{Duration, Instant, SystemTime, SystemTimeError, UNIX_EPOCH}, + time::{Duration, SystemTime, SystemTimeError, UNIX_EPOCH}, }; /// Error types from uapi responses diff --git a/crates/telio-wg/src/wg.rs b/crates/telio-wg/src/wg.rs index ee90673fc..772583d63 100644 --- a/crates/telio-wg/src/wg.rs +++ b/crates/telio-wg/src/wg.rs @@ -314,7 +314,7 @@ impl DynamicWg { /// firewall_reset_connections: None, /// }, /// None, - /// true, + /// true, /// ); /// } /// ``` @@ -978,7 +978,7 @@ impl State { tx_bytes, rx_bytes, peer_state, - timestamp: std::time::Instant::now(), + timestamp: tokio::time::Instant::now(), }; if analytics_tx.send(Box::new(event)).is_err() { telio_log_debug!("Failed to send analytics info for {:?}", pubkey); diff --git a/nat-lab/docker-compose.yml b/nat-lab/docker-compose.yml index 2e0f1e074..507ba8161 100644 --- a/nat-lab/docker-compose.yml +++ b/nat-lab/docker-compose.yml @@ -73,7 +73,7 @@ services: networks: cone-net-01: ipv4_address: 192.168.101.104 - # Note: in certain scenarios this port mapping can be changed as part of `./natlab.py --start`. + # Note: in certain scenarios this port mapping can be changed as part of `./natlab.py --start`. # When this comment was added, the only scenario where the port mapping is changed is CI running tests ports: ["58001"] ulimits: @@ -250,6 +250,7 @@ services: networks: internet: ipv4_address: 10.0.254.1 + ipv6_address: 2001:db8:85a4::1000:2541 priority: 1000 cone-net-01: ipv4_address: 192.168.101.254 diff --git a/nat-lab/network.md b/nat-lab/network.md index 01592d807..fdef9f624 100644 --- a/nat-lab/network.md +++ b/nat-lab/network.md @@ -63,6 +63,7 @@ graph LR 192.168.101.104") cone-gw-01(["cone-gw-01 10.0.254.1 + 2001:db8:85a4::1000:2541 192.168.101.254"]) end diff --git a/nat-lab/tests/telio.py b/nat-lab/tests/telio.py index b8c7f8551..cd16cb5a4 100644 --- a/nat-lab/tests/telio.py +++ b/nat-lab/tests/telio.py @@ -501,7 +501,6 @@ def __init__( force_ipv6_feature: bool = False, fingerprint: str = "", ) -> None: - self._router: Optional[Router] = None self._events: Optional[Events] = None self._runtime: Optional[Runtime] = None self._process: Optional[Process] = None @@ -509,6 +508,7 @@ def __init__( self._message_idx = 0 self._node = node self._connection = connection + self._router: Router = new_router(self._connection, self._node.ip_stack) self._adapter_type = adapter_type self._telio_features = telio_features self._quit = False @@ -556,7 +556,6 @@ async def on_stderr(stderr: str) -> None: self._runtime = Runtime() self._events = Events(self._runtime) - self._router = new_router(self._connection, self._node.ip_stack) object_name = str(uuid.uuid4()).replace("-", "") (host_ip, container_ip) = await self._connection.get_ip_address() @@ -680,10 +679,9 @@ async def on_stderr(stderr: str) -> None: ) print(datetime.now(), "Test cleanup: Clearing up routes") - if self._router: - await self._router.delete_vpn_route() - await self._router.delete_exit_node_route() - await self._router.delete_interface() + await self._router.delete_vpn_route() + await self._router.delete_exit_node_route() + await self._router.delete_interface() print(datetime.now(), "Test cleanup: Saving moose dbs") await self.save_moose_db() @@ -959,7 +957,6 @@ async def connect_to_exit_node( await event def get_router(self) -> Router: - assert self._router return self._router def get_runtime(self) -> Runtime: diff --git a/nat-lab/tests/test_direct_connection.py b/nat-lab/tests/test_direct_connection.py index cce1ca4bc..c209378ac 100644 --- a/nat-lab/tests/test_direct_connection.py +++ b/nat-lab/tests/test_direct_connection.py @@ -9,7 +9,6 @@ from config import DERP_SERVERS from contextlib import AsyncExitStack from helpers import setup_mesh_nodes, SetupParameters -from itertools import groupby from telio import PathType, State from telio_features import ( Batching, @@ -26,6 +25,7 @@ from utils.asyncio_util import run_async_context from utils.connection_util import ConnectionTag from utils.ping import ping +from utils.telio_log_notifier import TelioLogNotifier # Testing if batching being disabled or not there doesn't affect anything DISABLED_BATCHING_OPTIONS = (None, Batching(direct_connection_threshold=5)) @@ -326,30 +326,21 @@ def fix_provider_name(name): assert len(beta_direct.providers) > 0 beta_provider = fix_provider_name(beta_direct.providers[0]) - alpha_connection, _ = [conn.connection for conn in env.connections] - - await ping(alpha_connection, beta.ip_addresses[0]) - - # Break UHP - async with AsyncExitStack() as temp_exit_stack: - await temp_exit_stack.enter_async_context( - alpha_client.get_router().disable_path(reflexive_ip) - ) - - await asyncio.gather( - alpha_client.wait_for_state_peer( - beta.public_key, - [State.Connected], - [PathType.Relay], - ), - beta_client.wait_for_state_peer( - alpha.public_key, - [State.Connected], - [PathType.Relay], - ), - ) + alpha_connection, beta_connection = [ + conn.connection for conn in env.connections + ] - await ping(alpha_connection, beta.ip_addresses[0]) + # We need to compare the decoded forms, not the base64 encoded strings + if base64.b64decode(alpha.public_key) < base64.b64decode(beta.public_key): + reporting_connection = alpha_connection + losing_key = beta.public_key + from_provider = alpha_provider + to_provider = beta_provider + else: + reporting_connection = beta_connection + losing_key = alpha.public_key + from_provider = beta_provider + to_provider = alpha_provider await asyncio.gather( alpha_client.wait_for_state_peer( @@ -359,12 +350,19 @@ def fix_provider_name(name): alpha.public_key, [State.Connected], [PathType.Direct] ), ) - await ping(alpha_connection, beta.ip_addresses[0]) + telio_log_notifier = await exit_stack.enter_async_context( + TelioLogNotifier(reporting_connection).run() + ) + # Break UHP - async with AsyncExitStack() as temp_exit_stack: - await temp_exit_stack.enter_async_context( + async with AsyncExitStack() as direct_disabled_exit_stack: + relayed_state_reported = telio_log_notifier.notify_output( + f'Relayed peer state change for "{losing_key[:4]}...{losing_key[-4:]}" to Connected will be reported' + ) + + await direct_disabled_exit_stack.enter_async_context( alpha_client.get_router().disable_path(reflexive_ip) ) @@ -379,10 +377,16 @@ def fix_provider_name(name): [State.Connected], [PathType.Relay], ), + relayed_state_reported.wait(), ) await ping(alpha_connection, beta.ip_addresses[0]) + direct_state_reported = telio_log_notifier.notify_output( + f'Direct peer state change for "{losing_key[:4]}...{losing_key[-4:]}" to Connected' + f" ({from_provider} -> {to_provider}) will be reported", + ) + await asyncio.gather( alpha_client.wait_for_state_peer( beta.public_key, [State.Connected], [PathType.Direct] @@ -390,40 +394,9 @@ def fix_provider_name(name): beta_client.wait_for_state_peer( alpha.public_key, [State.Connected], [PathType.Direct] ), + direct_state_reported.wait(), ) - - pred = ( - '.* "telio_nurse::aggregator":\\d+ (.* peer state change for .* will be' - " reported)" - ) - # We need to compare the decoded forms, not the base64 encoded strings - if base64.b64decode(alpha.public_key) < base64.b64decode(beta.public_key): - losing_key = beta.public_key - log_lines = await alpha_client.get_log_lines(pred) - from_provider = alpha_provider - to_provider = beta_provider - else: - losing_key = alpha.public_key - log_lines = await beta_client.get_log_lines(pred) - from_provider = beta_provider - to_provider = alpha_provider - deduplicated_lines = [l for l, _ in groupby(log_lines)] - direct_event = ( - f"Direct peer state change for {losing_key} to Connected" - f" ({from_provider} -> {to_provider}) will be reported" - ) - relayed_event = ( - f"Relayed peer state change for {losing_key} to Connected will be reported" - ) - expected = [ - relayed_event, - direct_event, - relayed_event, - direct_event, - relayed_event, - direct_event, - ] - assert expected == deduplicated_lines + await ping(alpha_connection, beta.ip_addresses[0]) # This is expected. Clients can still receive messages from # the previous sessions. diff --git a/nat-lab/tests/test_lana.py b/nat-lab/tests/test_lana.py index 422ba5bcf..1fdf83841 100644 --- a/nat-lab/tests/test_lana.py +++ b/nat-lab/tests/test_lana.py @@ -63,6 +63,7 @@ ) from utils.ping import ping from utils.router import IPStack, IPProto +from utils.telio_log_notifier import TelioLogNotifier CONTAINER_EVENT_PATH = "/event.db" CONTAINER_EVENT_BACKUP_PATH = "/event_backup.db" @@ -70,6 +71,8 @@ BETA_EVENTS_PATH = "./beta-events.db" GAMMA_EVENTS_PATH = "./gamma-events.db" +DOCKER_CONE_GW_1_IPv4 = "10.0.254.1" +DOCKER_CONE_GW_1_IPv6 = "2001:db8:85a4::1000:2541" DERP_SERVERS_STRS = [ f"{DERP_PRIMARY['ipv4']}:{DERP_PRIMARY['relay_port']}", @@ -245,6 +248,60 @@ def ip_stack_to_bits(ip_stack: IPStack) -> int: return IPV4_BIT | IPV6_BIT +async def start_alpha_beta_in_relay( + exit_stack: AsyncExitStack, + api: API, + alpha: Node, + beta: Node, + connection_alpha: Connection, + connection_beta: Connection, + alpha_features: TelioFeatures, + beta_features: TelioFeatures, +) -> tuple[telio.Client, telio.Client]: + client_alpha = await exit_stack.enter_async_context( + telio.Client( + connection_alpha, + alpha, + telio_features=alpha_features, + fingerprint=ALPHA_FINGERPRINT, + ).run(api.get_meshmap(alpha.id)) + ) + + client_beta = telio.Client( + connection_beta, + beta, + telio_features=beta_features, + fingerprint=BETA_FINGERPRINT, + ) + + if base64.b64decode(alpha.public_key) < base64.b64decode(beta.public_key): + reporting_connection = connection_alpha + losing_key = beta.public_key + else: + reporting_connection = connection_beta + losing_key = alpha.public_key + + async with AsyncExitStack() as direct_disabled_exit_stack: + telio_log_notifier = await direct_disabled_exit_stack.enter_async_context( + TelioLogNotifier(reporting_connection).run() + ) + + for path in [DOCKER_CONE_GW_1_IPv4, DOCKER_CONE_GW_1_IPv6]: + await direct_disabled_exit_stack.enter_async_context( + client_beta.get_router().disable_path(path) + ) + + relayed_state_reported = telio_log_notifier.notify_output( + f'Relayed peer state change for "{losing_key[:4]}...{losing_key[-4:]}" to Connected will be reported' + ) + + await exit_stack.enter_async_context(client_beta.run(api.get_meshmap(beta.id))) + + await relayed_state_reported.wait() + + return client_alpha, client_beta + + async def run_default_scenario( exit_stack: AsyncExitStack, alpha_is_local, @@ -307,24 +364,16 @@ async def run_default_scenario( await clean_container(connection_beta) await clean_container(connection_gamma) - client_alpha = await exit_stack.enter_async_context( - telio.Client( - connection_alpha, - alpha, - telio_features=build_telio_features(), - fingerprint=ALPHA_FINGERPRINT, - ).run(api.get_meshmap(alpha.id)) - ) - - client_beta = await exit_stack.enter_async_context( - telio.Client( - connection_beta, - beta, - telio_features=build_telio_features(), - fingerprint=BETA_FINGERPRINT, - ).run(api.get_meshmap(beta.id)) + client_alpha, client_beta = await start_alpha_beta_in_relay( + exit_stack, + api, + alpha, + beta, + connection_alpha, + connection_beta, + build_telio_features(), + build_telio_features(), ) - client_gamma = await exit_stack.enter_async_context( telio.Client( connection_gamma, @@ -1247,21 +1296,15 @@ async def test_lana_with_meshnet_exit_node( await clean_container(connection_alpha) await clean_container(connection_beta) - client_alpha = await exit_stack.enter_async_context( - telio.Client( - connection_alpha, - alpha, - telio_features=build_telio_features(), - fingerprint=ALPHA_FINGERPRINT, - ).run(api.get_meshmap(alpha.id)) - ) - client_beta = await exit_stack.enter_async_context( - telio.Client( - connection_beta, - beta, - telio_features=build_telio_features(), - fingerprint=BETA_FINGERPRINT, - ).run(api.get_meshmap(beta.id)) + client_alpha, client_beta = await start_alpha_beta_in_relay( + exit_stack, + api, + alpha, + beta, + connection_alpha, + connection_beta, + build_telio_features(), + build_telio_features(), ) await asyncio.gather( @@ -1481,21 +1524,15 @@ def get_features_with_long_qos() -> TelioFeatures: features.nurse.qos.rtt_interval = RTT_INTERVAL * 10 return features - client_alpha = await exit_stack.enter_async_context( - telio.Client( - connection_alpha, - alpha, - telio_features=get_features_with_long_qos(), - fingerprint=ALPHA_FINGERPRINT, - ).run(api.get_meshmap(alpha.id)) - ) - client_beta = await exit_stack.enter_async_context( - telio.Client( - connection_beta, - beta, - telio_features=get_features_with_long_qos(), - fingerprint=BETA_FINGERPRINT, - ).run(api.get_meshmap(beta.id)) + client_alpha, client_beta = await start_alpha_beta_in_relay( + exit_stack, + api, + alpha, + beta, + connection_alpha, + connection_beta, + get_features_with_long_qos(), + get_features_with_long_qos(), ) await asyncio.gather( @@ -1529,8 +1566,25 @@ def get_features_with_long_qos() -> TelioFeatures: ) assert alpha_events assert beta_events - # disconnect beta and trigger analytics on alpha - await client_beta.stop_device() + + if base64.b64decode(alpha.public_key) < base64.b64decode(beta.public_key): + reporting_connection = connection_alpha + losing_key = beta.public_key + else: + reporting_connection = connection_beta + losing_key = alpha.public_key + + async with TelioLogNotifier(reporting_connection).run() as telio_log_notifier: + if losing_key == beta.public_key: + relayed_state_reported = telio_log_notifier.notify_output( + f'Relayed peer state change for "{losing_key[:4]}...{losing_key[-4:]}" to Connected will be reported' + ) + + # disconnect beta and trigger analytics on alpha + await client_beta.stop_device() + + if losing_key == beta.public_key: + await relayed_state_reported.wait() beta_events = await wait_for_event_dump( ConnectionTag.DOCKER_CONE_CLIENT_2, BETA_EVENTS_PATH, nr_events=2 diff --git a/nat-lab/tests/utils/router/linux_router.py b/nat-lab/tests/utils/router/linux_router.py index c6c7c24e6..e9e8db2ca 100644 --- a/nat-lab/tests/utils/router/linux_router.py +++ b/nat-lab/tests/utils/router/linux_router.py @@ -1,5 +1,5 @@ import config -from .router import Router, IPStack, IPProto +from .router import Router, IPStack, IPProto, get_ip_address_type from contextlib import asynccontextmanager from typing import AsyncIterator, List from utils.connection import Connection @@ -285,10 +285,8 @@ async def delete_exit_node_route(self) -> None: @asynccontextmanager async def disable_path(self, address: str) -> AsyncIterator: - addr_proto = self.check_ip_address(address) - - if addr_proto is None: - pass + addr_proto = get_ip_address_type(address) + assert addr_proto, "Incorrect address passed to disable_path" iptables_string = ("ip" if addr_proto == IPProto.IPv4 else "ip6") + "tables" diff --git a/nat-lab/tests/utils/telio_log_notifier.py b/nat-lab/tests/utils/telio_log_notifier.py new file mode 100644 index 000000000..d47d61ab1 --- /dev/null +++ b/nat-lab/tests/utils/telio_log_notifier.py @@ -0,0 +1,35 @@ +import asyncio +from contextlib import asynccontextmanager +from typing import AsyncIterator +from utils.connection import Connection, TargetOS +from utils.output_notifier import OutputNotifier +from utils.process import Process + + +class TelioLogNotifier: + _process: Process + _next_ping_event: asyncio.Event + _connection: Connection + + def __init__(self, connection: Connection) -> None: + assert ( + connection.target_os == TargetOS.Linux + ), "TelioLogNotifier supported only on Linux" + self._connection = connection + self._process = connection.create_process( + ["tail", "-n", "1", "-F", "/tcli.log"] + ) + self._output_notifier = OutputNotifier() + + def notify_output(self, what: str) -> asyncio.Event: + event = asyncio.Event() + self._output_notifier.notify_output(what, event) + return event + + @asynccontextmanager + async def run(self) -> AsyncIterator["TelioLogNotifier"]: + async with self._process.run( + stdout_callback=self._output_notifier.handle_output + ): + await self._process.wait_stdin_ready() + yield self diff --git a/src/device.rs b/src/device.rs index 02f9071d4..0b1e87060 100644 --- a/src/device.rs +++ b/src/device.rs @@ -21,7 +21,6 @@ use telio_task::{ io::{chan, mc_chan, mc_chan::Tx, Chan, McChan}, task_exec, BoxAction, Runtime as TaskRuntime, Task, }; -use telio_traversal::SessionKeeperTrait; use telio_traversal::{ connectivity_check, cross_ping_check::{CrossPingCheck, CrossPingCheckTrait, Io as CpcIo, UpgradeController}, @@ -36,6 +35,7 @@ use telio_traversal::{ ping_pong_handler::PingPongHandler, SessionKeeper, UpgradeRequestChangeEvent, UpgradeSync, WireGuardEndpointCandidateChangeEvent, }; +use telio_traversal::{SessionKeeperTrait, UpgradeSyncTrait}; #[cfg(any(target_os = "macos", target_os = "ios", target_os = "tvos"))] use telio_sockets::native; @@ -56,7 +56,7 @@ use tokio::{ use telio_dns::{DnsResolver, LocalDnsResolver, Records}; use telio_dns::bind_tun; -use wg::uapi::{self, AnalyticsEvent, PeerState}; +use wg::uapi::{self, PeerState}; use std::{ collections::{hash_map::Entry, HashMap, HashSet}, @@ -1427,8 +1427,6 @@ impl Runtime { cross_ping_check.clone(), multiplexer.get_channel().await?, self.requested_state.device_config.private_key.public(), - self.entities.aggregator.clone(), - self.entities.wireguard_interface.clone(), )?); match SessionKeeper::start(self.entities.socket_pool.clone()).map(Arc::new) { @@ -1576,6 +1574,7 @@ impl Runtime { } wg_controller::consolidate_wg_state(&self.requested_state, &self.entities, &self.features) + .boxed() .await?; Ok(()) } @@ -1594,6 +1593,7 @@ impl Runtime { self.entities.socket_pool.set_fwmark(fwmark); wg_controller::consolidate_wg_state(&self.requested_state, &self.entities, &self.features) + .boxed() .await?; Ok(()) } @@ -1681,6 +1681,7 @@ impl Runtime { self.upsert_dns_peers().await?; wg_controller::consolidate_wg_state(&self.requested_state, &self.entities, &self.features) + .boxed() .await?; Ok(()) @@ -1704,6 +1705,7 @@ impl Runtime { }; wg_controller::consolidate_wg_state(&self.requested_state, &self.entities, &self.features) + .boxed() .await?; Ok(()) } @@ -1902,6 +1904,7 @@ impl Runtime { } wg_controller::consolidate_wg_state(&self.requested_state, &self.entities, &self.features) + .boxed() .await?; for ep in self.entities.endpoint_providers().iter() { @@ -2013,6 +2016,7 @@ impl Runtime { let old_exit_node = self.requested_state.exit_node.replace(exit_node); wg_controller::consolidate_wg_state(&self.requested_state, &self.entities, &self.features) + .boxed() .await?; if let Some(last_exit) = old_exit_node @@ -2087,6 +2091,7 @@ impl Runtime { &self.entities, &self.features, ) + .boxed() .await?; } @@ -2293,6 +2298,7 @@ impl TaskRuntime for Runtime { Some(_) = self.event_listeners.wg_endpoint_publish_event_subscriber.recv() => { telio_log_debug!("WG consolidation triggered by endpoint publish event"); wg_controller::consolidate_wg_state(&self.requested_state, &self.entities, &self.features) + .boxed() .await .unwrap_or_else( |e| { @@ -2314,12 +2320,8 @@ impl TaskRuntime for Runtime { // We have downgraded the connection. Notify cross ping check about that. direct_entities.cross_ping_check.notify_failed_wg_connection(public_key) .await?; - direct_entities.session_keeper.remove_node(&public_key).await?; - - let mut event = AnalyticsEvent::from_event(&mesh_event); - event.peer_state = PeerState::Connected; - self.entities.aggregator.change_peer_state_relayed(&event).await; + direct_entities.upgrade_sync.clear_accepted_session(public_key).await; } else { telio_log_warn!("Connection downgraded while direct entities are disabled"); } @@ -2335,18 +2337,13 @@ impl TaskRuntime for Runtime { let node = self.peer_to_node(&mesh_event.peer, Some(mesh_event.state), mesh_event.link_state).await; if let Some(node) = node { - if mesh_event.state != PeerState::Connecting && node.path == PathType::Relay { - self.entities.aggregator.change_peer_state_relayed( - &AnalyticsEvent::from_event(&mesh_event) - ).await; - } // Publish WG event to app let event = Event::builder::().set(node).build(); if let Some(event) = event { - let _ = self.event_publishers.libtelio_event_publisher.send( - Box::new(event) - ); - } + let _ = self.event_publishers.libtelio_event_publisher.send( + Box::new(event) + ); + } } Ok(()) @@ -2365,6 +2362,7 @@ impl TaskRuntime for Runtime { Some(_) = self.event_listeners.endpoint_upgrade_event_subscriber.recv() => { telio_log_debug!("WG consolidation triggered by upgrade sync request"); wg_controller::consolidate_wg_state(&self.requested_state, &self.entities, &self.features) + .boxed() .await .unwrap_or_else( |e| { @@ -2380,6 +2378,7 @@ impl TaskRuntime for Runtime { self.requested_state.wg_stun_server = wg_stun_server; wg_controller::consolidate_wg_state(&self.requested_state, &self.entities, &self.features) + .boxed() .await .unwrap_or_else( |e| { @@ -2394,7 +2393,7 @@ impl TaskRuntime for Runtime { self.entities.postquantum_wg.on_event(pq_event); if let Err(err) = wg_controller::consolidate_wg_state(&self.requested_state, &self.entities, &self.features) - .await + .boxed().await { telio_log_warn!("WireGuard controller failure: {err:?}. Ignoring"); } @@ -2405,6 +2404,7 @@ impl TaskRuntime for Runtime { _ = self.polling_interval.tick() => { telio_log_debug!("WG consolidation triggered by tick event"); wg_controller::consolidate_wg_state(&self.requested_state, &self.entities, &self.features) + .boxed() .await .unwrap_or_else( |e| { diff --git a/src/device/wg_controller.rs b/src/device/wg_controller.rs index d69318469..b334e3ae1 100644 --- a/src/device/wg_controller.rs +++ b/src/device/wg_controller.rs @@ -1,4 +1,5 @@ use super::{Entities, RequestedState, Result}; +use futures::FutureExt; use ipnet::IpNet; use std::collections::{BTreeMap, HashMap, HashSet}; use std::iter::FromIterator; @@ -10,10 +11,11 @@ use telio_dns::DnsResolver; use telio_firewall::firewall::{Firewall, FILE_SEND_PORT}; use telio_model::constants::{VPN_EXTERNAL_IPV4, VPN_INTERNAL_IPV4, VPN_INTERNAL_IPV6}; use telio_model::features::Features; -use telio_model::mesh::LinkState; use telio_model::mesh::NodeState::Connected; +use telio_model::mesh::{LinkState, NodeState}; use telio_model::EndpointMap; use telio_model::SocketAddr; +use telio_nurse::aggregator::ConnectivityDataAggregator; use telio_proto::{PeersStatesMap, Session}; use telio_proxy::Proxy; use telio_starcast::starcast_peer::StarcastPeer; @@ -22,9 +24,11 @@ use telio_traversal::{ SessionKeeperTrait, UpgradeSyncTrait, WireGuardEndpointCandidateChangeEvent, }; use telio_utils::{telio_log_debug, telio_log_info, telio_log_warn}; +use telio_wg::uapi::AnalyticsEvent; use telio_wg::{uapi::Peer, WireGuard}; use thiserror::Error as TError; use tokio::sync::Mutex; +use tokio::time::Instant; pub const DEFAULT_PEER_UPGRADE_WINDOW: u64 = 15; @@ -82,6 +86,7 @@ pub async fn consolidate_wg_state( entities.meshnet.left().map(|m| &*m.proxy), entities.cross_ping_check(), entities.upgrade_sync(), + &entities.aggregator, entities.session_keeper(), &*entities.dns, remote_peer_states, @@ -99,6 +104,7 @@ pub async fn consolidate_wg_state( entities.starcast_vpeer(), features, ) + .boxed() .await?; let starcast_pub_key = if let Some(starcast_vpeer) = entities.starcast_vpeer() { Some(starcast_vpeer.get_peer().await?.public_key) @@ -157,6 +163,7 @@ async fn consolidate_wg_peers< proxy: Option<&P>, cross_ping_check: Option<&Arc>, upgrade_sync: Option<&Arc>, + aggregator: &ConnectivityDataAggregator, session_keeper: Option<&Arc>, dns: &Mutex>, remote_peer_states: PeersStatesMap, @@ -198,14 +205,29 @@ async fn consolidate_wg_peers< let update_keys = &requested_keys & &actual_keys; for key in delete_keys { - telio_log_info!("Removing peer: {:?}", actual_peers.get(key)); + let actual_peer = actual_peers.get(key).ok_or(Error::PeerNotFound)?; + telio_log_info!("Removing peer: {:?}", actual_peer); + + let event = AnalyticsEvent { + public_key: *key, + dual_ip_addresses: actual_peer.get_dual_ip_addresses(), + tx_bytes: actual_peer.tx_bytes.unwrap_or_default(), + rx_bytes: actual_peer.rx_bytes.unwrap_or_default(), + peer_state: NodeState::Disconnected, + timestamp: Instant::now(), + }; + aggregator.report_peer_state_relayed(&event).await; + wireguard_interface.del_peer(*key).await?; } for key in insert_keys { - telio_log_info!("Inserting peer: {:?}", requested_peers.get(key)); - let peer = requested_peers.get(key).ok_or(Error::PeerNotFound)?; - wireguard_interface.add_peer(peer.peer.clone()).await?; + let requested_peer = requested_peers.get(key).ok_or(Error::PeerNotFound)?; + telio_log_info!("Inserting peer: {:?}", requested_peer); + + wireguard_interface + .add_peer(requested_peer.peer.clone()) + .await?; if let Some(stun) = stun_ep_provider { if let Some(wg_stun_server) = requested_state.wg_stun_server.as_ref() { @@ -236,7 +258,31 @@ async fn consolidate_wg_peers< } let is_actual_peer_proxying = is_peer_proxying(actual_peer, &proxy_endpoints); - if is_actual_peer_proxying && !is_peer_proxying(&requested_peer.peer, &proxy_endpoints) { + let is_reqested_peer_proxying = is_peer_proxying(&requested_peer.peer, &proxy_endpoints); + + let event = AnalyticsEvent { + public_key: *key, + dual_ip_addresses: actual_peer.get_dual_ip_addresses(), + tx_bytes: actual_peer.tx_bytes.unwrap_or_default(), + rx_bytes: actual_peer.rx_bytes.unwrap_or_default(), + peer_state: actual_peer.state(), + timestamp: Instant::now(), + }; + if is_actual_peer_proxying { + aggregator.report_peer_state_relayed(&event).await; + } else if let Some(us) = upgrade_sync { + if let Some(accepted_direct_session) = us.get_accepted_session(*key).await { + aggregator + .report_peer_state_direct( + &event, + accepted_direct_session.local_ep, + accepted_direct_session.remote_ep, + ) + .await; + } + } + + if is_actual_peer_proxying && !is_reqested_peer_proxying { // We have upgraded the connection. If the upgrade happened because we have // selected a new direct endpoint candidate -> notify the other node about our own // local side endpoint, such that the other node can do this upgrade too. @@ -1054,6 +1100,7 @@ mod tests { use super::*; use crate::device::{DeviceConfig, DNS}; use mockall::predicate::{self, eq}; + use telio_nurse::config::AggregatorConfig; use std::net::{Ipv4Addr, Ipv6Addr, SocketAddrV4}; @@ -1801,6 +1848,11 @@ mod tests { .once() .with(eq(i)) .return_once(|_| Ok(())); + self.upgrade_sync + .expect_get_accepted_session() + .once() + .with(eq(i)) + .return_once(|_| None); } } @@ -1817,13 +1869,20 @@ mod tests { let session_keeper = Arc::new(self.session_keeper); let stun_ep_provider = { self.stun_ep_provider.map(Arc::new) }; let upnp_ep_provider: Option> = None; + let wireguard_interface = Arc::new(self.wireguard_interface); + let aggregator = ConnectivityDataAggregator::new( + AggregatorConfig::default(), + wireguard_interface.clone(), + Default::default(), + ); consolidate_wg_peers( &self.requested_state, - &self.wireguard_interface, + wireguard_interface.as_ref(), Some(&self.proxy), Some(&cross_ping_check), Some(&upgrade_sync), + &aggregator, Some(&session_keeper), &self.dns, HashMap::new(),