From bd42e116630fa3fdffe3b1ed176c22eee0c138ff Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rytis=20Karpu=C5=A1ka?= Date: Wed, 11 Dec 2024 14:54:55 +0200 Subject: [PATCH] WIP: introduce event_reporter module --- crates/telio-model/src/constants.rs | 5 + src/device.rs | 32 +++++-- src/device/event_reporter.rs | 140 ++++++++++++++++++++++++++++ 3 files changed, 168 insertions(+), 9 deletions(-) create mode 100644 src/device/event_reporter.rs diff --git a/crates/telio-model/src/constants.rs b/crates/telio-model/src/constants.rs index c972f6cbd..87cc27c9e 100644 --- a/crates/telio-model/src/constants.rs +++ b/crates/telio-model/src/constants.rs @@ -3,6 +3,11 @@ use std::net::{Ipv4Addr, Ipv6Addr}; use telio_utils::const_ipnet::{ConstIpv4Net, ConstIpv6Net}; +/// Reserved IPv4 Range +pub const RESERVED_IPV4_IPS: ConstIpv4Net = ConstIpv4Net::new(Ipv4Addr::new(100, 64, 0, 0), 29); +/// Reserved IPv6 Range +pub const RESERVED_IPV6_IPS: ConstIpv6Net = ConstIpv6Net::new(Ipv6Addr::new(0xfd74, 0x656c, 0x696f, 0, 0, 0, 0, 0x5), 125); + /// VPN IPv4 Meshnet Address pub const VPN_INTERNAL_IPV4: Ipv4Addr = Ipv4Addr::new(100, 64, 0, 1); /// VPN IPv6 Meshnet Address diff --git a/src/device.rs b/src/device.rs index b35140ba3..c001e2a08 100644 --- a/src/device.rs +++ b/src/device.rs @@ -1,3 +1,4 @@ +mod event_reporter; mod wg_controller; use async_trait::async_trait; @@ -50,7 +51,7 @@ use telio_wg as wg; use thiserror::Error as TError; use tokio::{ runtime::{Builder, Runtime as AsyncRuntime}, - sync::{broadcast::error::RecvError, Mutex}, + sync::{broadcast::error::RecvError, broadcast::error::SendError, Mutex}, time::Interval, }; @@ -82,7 +83,7 @@ use telio_utils::{ use telio_model::{ config::{Config, Peer, PeerBase, Server as DerpServer}, event::{Event, Set}, - features::{FeaturePersistentKeepalive, Features, PathType}, + features::{Features, PathType}, mesh::{ExitNode, LinkState, Node}, validation::validate_nickname, EndpointMap, @@ -183,6 +184,10 @@ pub enum Error { TransportError(#[from] telio_starcast::transport::Error), #[error("Events processing thread failed to start: {0}")] EventsProcessingThreadStartError(std::io::Error), + #[error("Failed to build event structure")] + EventBuildingError, + #[error("Failed to publish libtelio event: {0}")] + EventPublishingError(#[from] SendError>), } pub type Result = std::result::Result; @@ -449,6 +454,11 @@ struct Runtime { /// Some of the events are time based, so just poll the whole state from time to time polling_interval: Interval, + /// Event reporter. + /// + /// This instance is responsible for all libtelio events reported to the integrators + event_reporter: event_reporter::EventReporter, + #[cfg(test)] /// MockedAdapter (tests) test_env: telio_wg::tests::Env, @@ -1034,6 +1044,8 @@ impl Runtime { features: Features, protect: Option>, ) -> Result { + let event_reporter = event_reporter::EventReporter::new(libtelio_wide_event_publisher.clone()); + let neptun_reset_conns = features.firewall.neptun_reset_conns || features.firewall.boringtun_reset_conns; @@ -1270,6 +1282,7 @@ impl Runtime { derp_events_publisher: derp_events.tx, }, polling_interval, + event_reporter, #[cfg(test)] test_env: wg::tests::Env { analytics: analytics_ch, @@ -2366,6 +2379,7 @@ impl TaskRuntime for Runtime { } } + // TODO: Events: state change event occured. Need to notify libtelio event handler let node = self.peer_to_node(&mesh_event.peer, Some(mesh_event.state), mesh_event.link_state).await; if let Some(node) = node { @@ -2381,13 +2395,11 @@ impl TaskRuntime for Runtime { Ok(()) }, - Ok(derp_event) = self.event_listeners.derp_event_subscriber.recv() => { - let event = Event::builder::().set(*derp_event).build(); - if let Some(event) = event { - let _ = self.event_publishers.libtelio_event_publisher.send( - Box::new(event) - ); - } + Ok(_) = self.event_listeners.derp_event_subscriber.recv() => { + let res = self.event_reporter.report_events(&self.requested_state, &self.entities, &self.features).await; + if res.is_err() { + telio_log_error!("Failed to report events to library integrators {:?}", res); + } Ok(()) }, @@ -2421,6 +2433,7 @@ impl TaskRuntime for Runtime { Some(pq_event) = self.event_listeners.post_quantum_subscriber.recv() => { telio_log_debug!("WG consolidation triggered by PQ event"); + //TODO: Events: Potentially PQ state has changed self.entities.postquantum_wg.on_event(pq_event); @@ -2435,6 +2448,7 @@ impl TaskRuntime for Runtime { _ = self.polling_interval.tick() => { telio_log_debug!("WG consolidation triggered by tick event, total logs dropped: {}", logs_dropped_until_now()); + //TODO: Events: just in case - trigger telio event handling code let dropped = logs_dropped_since_last_checked(); if dropped > 0 { telio_log_warn!("New logs dropped: {dropped}"); diff --git a/src/device/event_reporter.rs b/src/device/event_reporter.rs new file mode 100644 index 000000000..10081c7ef --- /dev/null +++ b/src/device/event_reporter.rs @@ -0,0 +1,140 @@ +use std::collections::HashMap; +use super::{Entities, Error, RequestedState, Result}; +use std::sync::Arc; +use ipnet::{IpNet, Ipv4Net, Ipv6Net}; +use telio_model::config::{RelayState, Server as RelayEvent}; +use telio_model::event::Set; +use telio_model::{event::Event, features::Features, mesh::Node, PublicKey}; +use telio_model::constants::{RESERVED_IPV4_IPS, RESERVED_IPV6_IPS}; +use telio_relay::DerpRelay; +use telio_task::io::mc_chan::Tx; +use telio_wg::uapi::Peer; +use telio_wg::{DynamicWg, WireGuard}; + +pub struct EventReporter { + event_publisher: Tx>, + last_reported_derp_state: Option, + last_reported_node_state: HashMap, +} + +impl EventReporter { + pub fn new(event_publisher: Tx>) -> Self { + return Self { + event_publisher, + last_reported_derp_state: Default::default(), + last_reported_node_state: Default::default(), + }; + } + + /// Report any state changes to the libtelio library users + /// + /// This function will mainly take care of the following uses cases: + /// * Report event for any state change of any VPN/Meshnet Node. VPN nodes may retrieve their + /// statuses from telio-pq as well as telio-wg as sources of truth for current state. + /// * Report DERP connection status changes. + /// * Notify apps about non-recoverable issues, for example in case WireGuard starts spewing + /// errors over UAPI. + /// + /// The list above may get extended in the future. + /// + /// This module depends on external monitoring of state changes, it will not monitor for it + /// itself. Hence the handle_state_change function should get called whenever any state change + /// occurs. + /// + /// Note here we are *not* handling panic's reporting. + pub async fn report_events( + &mut self, + _requested_state: &RequestedState, + entities: &Entities, + _features: &Features, + ) -> Result { + self.report_derp_events(entities.meshnet.left().map(|e| &e.derp)) + .await?; + self.report_node_events(&entities.wireguard_interface).await?; + Ok(()) + } + + pub async fn report_derp_events(&mut self, relay: Option<&Arc>) -> Result { + // Meshnet may be disabled, in which case relay will be None + // Otherwise we may have no server selected + // Or we may be attemting to connect to some server + // Or we may be connected to some server + let current_relay_state = match relay { + Some(r) => r.get_connected_server().await, + _ => None, + }; + + // Evaluate if anything has changed and build an even if necessary + let event = match ( + self.last_reported_derp_state.clone(), + current_relay_state.clone(), + ) { + // Nothing Changed + (None, None) => return Ok(()), + (Some(a), Some(b)) if a == b => return Ok(()), + + // Either freshly connected or connection info has changed + (None, Some(current_state)) | (Some(_), Some(current_state)) => { + Event::builder::() + .set(current_state) + .build() + .ok_or(Error::EventBuildingError)? + } + + // Disconnected + (Some(last_reported_derp_state), None) => { + let relay_event = RelayEvent { + conn_state: RelayState::Disconnected, + ..last_reported_derp_state + }; + Event::builder::() + .set(relay_event) + .build() + .ok_or(Error::EventBuildingError)? + } + }; + + // Fire event + self.event_publisher.send(Box::new(event))?; + + // Make sure, that we know what has been reported in the future iterations + self.last_reported_derp_state = current_relay_state; + Ok(()) + } + + pub async fn report_node_events(&mut self, wireguard: &Arc) -> Result { + // Retreive relevant WireGuard peers + let wg_peers: HashMap = wireguard.get_interface().await? + .peers + .into_iter() + .filter(|(_, peer)| Self::should_report_peer(&peer)) + .collect(); + + //TODO: acquire some sort of representation of telio-pq current state + + //TODO: For each node: + // compare against reported state + //TODO: report any differences + //TODO: save reported state + Ok(()) + } + + fn should_report_peer(peer: &Peer) -> bool { + // Note: We have some internal virtual peers (like DNS/starcast and similar) which we + // would like to leave unreported as per agreement with integrators, peers which are + // not explicitly requested by them - should not trigger events. + // Current convention is that such peers have allowed ips set to reserved range + // defined in telio_model::constants. + // It would be nicer to have explicit indication of the fact that peer is internal, + // but until it is introduced - we will have to rely on allowed ips, and reserved + // ranges + // There is a case worth mentioning, that for VPN and Exit Node peers we have a range + // of 0.0.0.0/0 in allowedIps, which is not contained by reserved range. But at the + // same time such peers should be reported, so everything is in-line + peer.allowed_ips.iter() + .all(|net| match &net { + IpNet::V4(ipv4net) => Ipv4Net::from(RESERVED_IPV4_IPS).contains(ipv4net), + IpNet::V6(ipv6net) => Ipv6Net::from(RESERVED_IPV6_IPS).contains(ipv6net) + }) + } +}