diff --git a/crates/telio-nurse/src/nurse.rs b/crates/telio-nurse/src/nurse.rs index fff5f919c..80801e883 100644 --- a/crates/telio-nurse/src/nurse.rs +++ b/crates/telio-nurse/src/nurse.rs @@ -34,9 +34,9 @@ use crate::qos::Io as QoSIo; use crate::qos::OutputData as QoSData; /// Input/output channels for Nurse -pub struct NurseIo<'a> { +pub struct NurseIo { /// Event channel to gather wg events - pub wg_event_channel: &'a mc_chan::Tx>, + pub wg_event_channel: mc_chan::Rx>, /// Event channel to gather wg data pub wg_analytics_channel: Option>>, /// Event channel to gather meshnet config update @@ -57,7 +57,7 @@ impl Nurse { pub async fn start_with( public_key: PublicKey, config: Config, - io: NurseIo<'_>, + io: NurseIo, aggregator: Arc, ipv6_enabled: bool, ) -> Self { @@ -127,7 +127,7 @@ impl State { pub async fn new( public_key: PublicKey, config: Config, - io: NurseIo<'_>, + io: NurseIo, aggregator: Arc, ipv6_enabled: bool, ) -> Self { @@ -152,7 +152,7 @@ impl State { let heartbeat_io = HeartbeatIo { chan: None, derp_event_channel: None, - wg_event_channel: io.wg_event_channel.subscribe(), + wg_event_channel: io.wg_event_channel, config_update_channel: config_update_channel.subscribe(), analytics_channel: analytics_channel.tx.clone(), collection_trigger_channel: collection_trigger_channel.subscribe(), diff --git a/src/device.rs b/src/device.rs index d3a608467..c345d7398 100644 --- a/src/device.rs +++ b/src/device.rs @@ -1,3 +1,4 @@ +mod event_reporter; mod wg_controller; use async_trait::async_trait; @@ -50,7 +51,7 @@ use telio_wg as wg; use thiserror::Error as TError; use tokio::{ runtime::{Builder, Runtime as AsyncRuntime}, - sync::{broadcast::error::RecvError, Mutex}, + sync::{broadcast::error::RecvError, broadcast::error::SendError, Mutex}, time::Interval, }; @@ -82,7 +83,7 @@ use telio_utils::{ use telio_model::{ config::{Config, Peer, PeerBase, Server as DerpServer}, event::{Event, Set}, - features::{FeaturePersistentKeepalive, Features, PathType}, + features::{Features, PathType}, mesh::{ExitNode, LinkState, Node}, validation::validate_nickname, EndpointMap, @@ -183,6 +184,12 @@ pub enum Error { TransportError(#[from] telio_starcast::transport::Error), #[error("Events processing thread failed to start: {0}")] EventsProcessingThreadStartError(std::io::Error), + #[error("Failed to build event structure")] + EventBuildingError, + #[error("Failed to publish libtelio event: {0}")] + EventPublishingError(#[from] SendError>), + #[error("Failed to publish libtelio event: {0}")] + EventReportingError(String), } pub type Result = std::result::Result; @@ -232,9 +239,6 @@ pub struct RequestedState { // Wireguard stun server that should be currently used pub wg_stun_server: Option, - - // Requested keepalive periods - pub(crate) keepalive_periods: FeaturePersistentKeepalive, } pub struct MeshnetEntities { @@ -452,6 +456,11 @@ struct Runtime { /// Some of the events are time based, so just poll the whole state from time to time polling_interval: Interval, + /// Event reporter. + /// + /// This instance is responsible for all libtelio events reported to the integrators + event_reporter: event_reporter::EventReporter, + #[cfg(test)] /// MockedAdapter (tests) test_env: telio_wg::tests::Env, @@ -1037,6 +1046,9 @@ impl Runtime { features: Features, protect: Option>, ) -> Result { + let event_reporter = + event_reporter::EventReporter::new(libtelio_wide_event_publisher.clone()); + let neptun_reset_conns = features.firewall.neptun_reset_conns || features.firewall.boringtun_reset_conns; @@ -1166,7 +1178,7 @@ impl Runtime { let nurse = if telio_lana::is_lana_initialized() { if let Some(nurse_features) = &features.nurse { let nurse_io = NurseIo { - wg_event_channel: &libtelio_wide_event_publisher, + wg_event_channel: libtelio_wide_event_publisher.subscribe(), wg_analytics_channel: analytics_ch.clone(), config_update_channel: config_update_ch.clone(), collection_trigger_channel: collection_trigger_ch.clone(), @@ -1207,7 +1219,6 @@ impl Runtime { let requested_state = RequestedState { device_config: config.clone(), - keepalive_periods: features.wireguard.persistent_keepalive.clone(), ..Default::default() }; @@ -1274,6 +1285,7 @@ impl Runtime { derp_events_publisher: derp_events.tx, }, polling_interval, + event_reporter, #[cfg(test)] test_env: wg::tests::Env { analytics: analytics_ch, @@ -2370,6 +2382,7 @@ impl TaskRuntime for Runtime { } } + // TODO: Events: state change event occured. Need to notify libtelio event handler let node = self.peer_to_node(&mesh_event.peer, Some(mesh_event.state), mesh_event.link_state).await; if let Some(node) = node { @@ -2385,13 +2398,11 @@ impl TaskRuntime for Runtime { Ok(()) }, - Ok(derp_event) = self.event_listeners.derp_event_subscriber.recv() => { - let event = Event::builder::().set(*derp_event).build(); - if let Some(event) = event { - let _ = self.event_publishers.libtelio_event_publisher.send( - Box::new(event) - ); - } + Ok(_) = self.event_listeners.derp_event_subscriber.recv() => { + let res = self.event_reporter.report_events(&self.requested_state, &self.entities).await; + if res.is_err() { + telio_log_error!("Failed to report events to library integrators {:?}", res); + } Ok(()) }, @@ -2425,6 +2436,7 @@ impl TaskRuntime for Runtime { Some(pq_event) = self.event_listeners.post_quantum_subscriber.recv() => { telio_log_debug!("WG consolidation triggered by PQ event"); + //TODO: Events: Potentially PQ state has changed self.entities.postquantum_wg.on_event(pq_event); @@ -2439,6 +2451,7 @@ impl TaskRuntime for Runtime { _ = self.polling_interval.tick() => { telio_log_debug!("WG consolidation triggered by tick event, total logs dropped: {}", logs_dropped_until_now()); + //TODO: Events: just in case - trigger telio event handling code let dropped = logs_dropped_since_last_checked(); if dropped > 0 { telio_log_warn!("New logs dropped: {dropped}"); diff --git a/src/device/event_reporter.rs b/src/device/event_reporter.rs new file mode 100644 index 000000000..60d90d833 --- /dev/null +++ b/src/device/event_reporter.rs @@ -0,0 +1,220 @@ +use super::{Entities, Error, RequestedState, Result}; +use telio_proxy::Proxy; +use std::collections::HashMap; +use std::net::IpAddr; +use std::sync::Arc; +use telio_model::config::{Peer, RelayState, Server as RelayEvent}; +use telio_model::constants::{VPN_EXTERNAL_IPV4, VPN_INTERNAL_IPV4}; +use telio_model::event::Set; +use telio_model::features::PathType; +use telio_model::mesh::{ExitNode, LinkState}; +use telio_model::{event::Event, mesh::Node, EndpointMap, PublicKey}; +use telio_pq::Entity as PqEntity; +use telio_relay::DerpRelay; +use telio_task::io::mc_chan::Tx; +use telio_wg::uapi::{Peer as WGPeer, PeerState}; +use telio_wg::{DynamicWg, WireGuard}; + +pub struct EventReporter { + event_publisher: Tx>, + last_reported_derp_state: Option, + last_reported_node_state: HashMap, +} + +impl EventReporter { + pub fn new(event_publisher: Tx>) -> Self { + return Self { + event_publisher, + last_reported_derp_state: Default::default(), + last_reported_node_state: Default::default(), + }; + } + + /// Report any state changes to the libtelio library users + /// + /// This function will mainly take care of the following uses cases: + /// * Report event for any state change of any VPN/Meshnet Node. VPN nodes may retrieve their + /// statuses from telio-pq as well as telio-wg as sources of truth for current state. + /// * Report DERP connection status changes. + /// * Notify apps about non-recoverable issues, for example in case WireGuard starts spewing + /// errors over UAPI. + /// + /// The list above may get extended in the future. + /// + /// This module depends on external monitoring of state changes, it will not monitor for it + /// itself. Hence the handle_state_change function should get called whenever any state change + /// occurs. + /// + /// Note here we are *not* handling panic's reporting. + pub async fn report_events( + &mut self, + requested_state: &RequestedState, + entities: &Entities, + ) -> Result { + // Report any changes to DERP state + self.report_derp_events(entities.meshnet.left().map(|e| &e.derp)) + .await?; + + // Report any changes to Nodes state, but first - lets acquire some information required to + // do the reporting + let proxy_endpoints = match entities.meshnet.left() { + None => Default::default(), + Some(mesh) => mesh.proxy.get_endpoint_map().await?, + }; + let requested_nodes: HashMap = requested_state.meshnet_config.as_ref() + .and_then(|mesh| mesh.peers.clone()) + .map_or(vec![], |peers| peers) + .iter() + .map(|p| (p.public_key.clone(), p.clone())) + .collect(); + + self.report_node_events( + &entities.wireguard_interface, + &entities.postquantum_wg, + &requested_state.exit_node, + &proxy_endpoints, + &requested_nodes, + ) + .await?; + Ok(()) + } + + pub async fn report_derp_events(&mut self, relay: Option<&Arc>) -> Result { + // Meshnet may be disabled, in which case relay will be None + // Otherwise we may have no server selected + // Or we may be attemting to connect to some server + // Or we may be connected to some server + let current_relay_state = match relay { + Some(r) => r.get_connected_server().await, + _ => None, + }; + + // Evaluate if anything has changed and build an even if necessary + let event = match ( + self.last_reported_derp_state.clone(), + current_relay_state.clone(), + ) { + // Nothing Changed + (None, None) => return Ok(()), + (Some(a), Some(b)) if a == b => return Ok(()), + + // Either freshly connected or connection info has changed + (None, Some(current_state)) | (Some(_), Some(current_state)) => { + Event::builder::() + .set(current_state) + .build() + .ok_or(Error::EventBuildingError)? + } + + // Disconnected + (Some(last_reported_derp_state), None) => { + let relay_event = RelayEvent { + conn_state: RelayState::Disconnected, + ..last_reported_derp_state + }; + Event::builder::() + .set(relay_event) + .build() + .ok_or(Error::EventBuildingError)? + } + }; + + // Fire event + self.event_publisher.send(Box::new(event))?; + + // Make sure, that we know what has been reported in the future iterations + self.last_reported_derp_state = current_relay_state; + Ok(()) + } + + pub async fn report_node_events( + &mut self, + wireguard: &Arc, + pq: &PqEntity, + exit_node: &Option, + proxy_endpoint_map: &EndpointMap, + requested_nodes: &HashMap + ) -> Result { + // Retreive relevant WireGuard peers + let wg_peers = wireguard + .get_interface() + .await? + .peers; + + let nodes = wg_peers + .values() + .into_iter() + .map(|wg_peer| (wg_peer, requested_nodes.get(&wg_peer.public_key), exit_node.clone().filter(|e| e.public_key == wg_peer.public_key))); + + // TODO: convert wg_peers to Nodes + + // TODO: if pq.is_rotating_keys() is true, but Node is not present in Nodes map -> insert + // one + + // TODO: Find all disconnected nodes (present in last_reported_node_state, but not in new + // nodes map). And issue disconnect event for all of them. Delete them from hasmap + + // TODO: for the rest of the nodes issue events if node structs are unequal + + Ok(()) + } + + fn merge_wg_peer_and_requested_node_info( + peer: &WGPeer, + requested_peer: &Option, + state: PeerState, + link_state: Option, + exit_node: &Option, + proxy_endpoint_map: &EndpointMap, + ) -> Result { + // Make sure that, whenever provided - requested_peer and/or exit_node refers to the same + // peer (identified by public key) as the WG information is coming from + if requested_peer.as_ref().map_or(true, |rn| rn.public_key == peer.public_key) + && exit_node.as_ref().map_or(true, |en| en.public_key == peer.public_key) { + return Err(Error::EventReportingError(String::from("Event information merging attempted on mismatched peers/nodes/exit_nodes"))); + } + + // Make sure that we are merging information for peers which has been requested by + // integrators. This means that either requested_peer or exit_node must be Some(_). + let cfg_id = match (&requested_peer, &exit_node) { + (Some(requested_peer), _) => requested_peer.base.identifier.clone(), + (_, Some(exit_node)) => exit_node.identifier.clone(), + (None, None) => { + return Err(Error::EventReportingError(String::from("Event information merging attempted on internal peer"))); + } + }; + + // Determine path type according to proxy endpoint map + let path = proxy_endpoint_map + .get(&peer.public_key) + .and_then(|proxy| peer.endpoint.filter(|endpoint| proxy.contains(endpoint))) + .map_or(PathType::Direct, |_| PathType::Relay); + + // Determine IP addresses assigned to the peer + let ip_addresses = match (requested_peer, exit_node) { + (Some(requested_node), _) => requested_node.ip_addresses.clone().unwrap_or_default(), + (_, Some(_)) => vec![IpAddr::V4(VPN_EXTERNAL_IPV4), IpAddr::V4(VPN_INTERNAL_IPV4)], + (None, None) => vec!{} + }; + + // Build node struct out of all the information we have + Ok(Node { + identifier: cfg_id, + public_key: peer.public_key, + nickname: requested_peer.as_ref().map(|p| p.base.nickname.clone()).flatten().map(|n| n.0), + state, + link_state, + is_exit: exit_node.is_some(), + is_vpn: (exit_node.is_some() && requested_peer.is_none()), + ip_addresses, + allowed_ips: peer.allowed_ips.clone(), + endpoint: peer.endpoint, + hostname: requested_peer.as_ref().map(|p| p.base.hostname.0.clone()), + allow_incoming_connections: requested_peer.as_ref().map(|n| n.allow_incoming_connections).unwrap_or_default(), + allow_peer_send_files: requested_peer.as_ref().map(|n| n.allow_peer_send_files).unwrap_or_default(), + path, + allow_multicast: requested_peer.as_ref().map(|n| n.allow_multicast).unwrap_or_default(), + peer_allows_multicast: requested_peer.as_ref().map(|n| n.peer_allows_multicast).unwrap_or_default(), + }) + } +} diff --git a/src/device/wg_controller.rs b/src/device/wg_controller.rs index 3311fe09e..22dfae0e7 100644 --- a/src/device/wg_controller.rs +++ b/src/device/wg_controller.rs @@ -528,7 +528,7 @@ async fn consolidate_wg_peers< requested_peer .peer .persistent_keepalive_interval - .unwrap_or(requested_state.keepalive_periods.direct) + .unwrap_or(features.wireguard.persistent_keepalive.direct) .into(), ), None, @@ -704,9 +704,9 @@ async fn build_requested_peers_list< ip_addresses.push(VPN_EXTERNAL_IPV4.into()); let (persistent_keepalive_interval, batching_keepalive_interval) = if features.batching.is_some() { - (None, requested_state.keepalive_periods.vpn) + (None, features.wireguard.persistent_keepalive.vpn) } else { - (requested_state.keepalive_periods.vpn, None) + (features.wireguard.persistent_keepalive.vpn, None) }; // If the PQ VPN is set up we need to configure the preshared key @@ -784,9 +784,9 @@ async fn build_requested_peers_list< telio_log_debug!("Configuring wg-stun peer: {}, at {}", public_key, endpoint); let (persistent_keepalive_interval, batching_keepalive_interval) = if features.batching.is_some() { - (None, requested_state.keepalive_periods.stun) + (None, features.wireguard.persistent_keepalive.stun) } else { - (requested_state.keepalive_periods.stun, None) + (features.wireguard.persistent_keepalive.stun, None) }; let allowed_ips = if features.ipv6 { @@ -860,9 +860,9 @@ async fn build_requested_meshnet_peers_list< let public_key = p.base.public_key; let (persistent_keepalive_interval, batching_keepalive_interval) = if features.batching.is_some() { - (None, requested_state.keepalive_periods.proxying) + (None, features.wireguard.persistent_keepalive.proxying) } else { - (requested_state.keepalive_periods.proxying, None) + (features.wireguard.persistent_keepalive.proxying, None) }; let endpoint = proxy_endpoints @@ -963,6 +963,7 @@ async fn build_requested_meshnet_peers_list< None => &[], }, requested_state, + features, ); // If we are in direct state, tell cross ping check about it @@ -1002,10 +1003,10 @@ async fn build_requested_meshnet_peers_list< // If peer is offline according to derp, we turn off keepalives. None } else { - requested_state.keepalive_periods.proxying + features.wireguard.persistent_keepalive.proxying } } else { - Some(requested_state.keepalive_periods.direct) + Some(features.wireguard.persistent_keepalive.direct) }; if features.batching.is_some() { @@ -1201,11 +1202,12 @@ fn peer_state( link_state: Option, proxy_endpoints: &[SocketAddr], requested_state: &RequestedState, + features: &Features, ) -> PeerState { // Define some useful constants let keepalive_period = peer .and_then(|p| p.persistent_keepalive_interval) - .unwrap_or(requested_state.keepalive_periods.direct); + .unwrap_or(features.wireguard.persistent_keepalive.direct); let peer_connectivity_timeout = Duration::from_secs((keepalive_period * 3) as u64); let peer_upgrade_window = Duration::from_secs(DEFAULT_PEER_UPGRADE_WINDOW);