Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[LLT-5814] refactor events reporting from libtelio #1021

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions crates/telio-nurse/src/nurse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ use crate::qos::Io as QoSIo;
use crate::qos::OutputData as QoSData;

/// Input/output channels for Nurse
pub struct NurseIo<'a> {
pub struct NurseIo {
/// Event channel to gather wg events
pub wg_event_channel: &'a mc_chan::Tx<Box<Event>>,
pub wg_event_channel: mc_chan::Rx<Box<Event>>,
/// Event channel to gather wg data
pub wg_analytics_channel: Option<mc_chan::Tx<Box<AnalyticsEvent>>>,
/// Event channel to gather meshnet config update
Expand All @@ -57,7 +57,7 @@ impl Nurse {
pub async fn start_with(
public_key: PublicKey,
config: Config,
io: NurseIo<'_>,
io: NurseIo,
aggregator: Arc<ConnectivityDataAggregator>,
ipv6_enabled: bool,
) -> Self {
Expand Down Expand Up @@ -127,7 +127,7 @@ impl State {
pub async fn new(
public_key: PublicKey,
config: Config,
io: NurseIo<'_>,
io: NurseIo,
aggregator: Arc<ConnectivityDataAggregator>,
ipv6_enabled: bool,
) -> Self {
Expand All @@ -152,7 +152,7 @@ impl State {
let heartbeat_io = HeartbeatIo {
chan: None,
derp_event_channel: None,
wg_event_channel: io.wg_event_channel.subscribe(),
wg_event_channel: io.wg_event_channel,
config_update_channel: config_update_channel.subscribe(),
analytics_channel: analytics_channel.tx.clone(),
collection_trigger_channel: collection_trigger_channel.subscribe(),
Expand Down
41 changes: 27 additions & 14 deletions src/device.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
mod event_reporter;
mod wg_controller;

use async_trait::async_trait;
Expand Down Expand Up @@ -50,7 +51,7 @@ use telio_wg as wg;
use thiserror::Error as TError;
use tokio::{
runtime::{Builder, Runtime as AsyncRuntime},
sync::{broadcast::error::RecvError, Mutex},
sync::{broadcast::error::RecvError, broadcast::error::SendError, Mutex},
time::Interval,
};

Expand Down Expand Up @@ -82,7 +83,7 @@ use telio_utils::{
use telio_model::{
config::{Config, Peer, PeerBase, Server as DerpServer},
event::{Event, Set},
features::{FeaturePersistentKeepalive, Features, PathType},
features::{Features, PathType},
mesh::{ExitNode, LinkState, Node},
validation::validate_nickname,
EndpointMap,
Expand Down Expand Up @@ -183,6 +184,12 @@ pub enum Error {
TransportError(#[from] telio_starcast::transport::Error),
#[error("Events processing thread failed to start: {0}")]
EventsProcessingThreadStartError(std::io::Error),
#[error("Failed to build event structure")]
EventBuildingError,
#[error("Failed to publish libtelio event: {0}")]
EventPublishingError(#[from] SendError<Box<Event>>),
#[error("Failed to publish libtelio event: {0}")]
EventReportingError(String),
}

pub type Result<T = ()> = std::result::Result<T, Error>;
Expand Down Expand Up @@ -232,9 +239,6 @@ pub struct RequestedState {

// Wireguard stun server that should be currently used
pub wg_stun_server: Option<StunServer>,

// Requested keepalive periods
pub(crate) keepalive_periods: FeaturePersistentKeepalive,
}

pub struct MeshnetEntities {
Expand Down Expand Up @@ -452,6 +456,11 @@ struct Runtime {
/// Some of the events are time based, so just poll the whole state from time to time
polling_interval: Interval,

/// Event reporter.
///
/// This instance is responsible for all libtelio events reported to the integrators
event_reporter: event_reporter::EventReporter,

#[cfg(test)]
/// MockedAdapter (tests)
test_env: telio_wg::tests::Env,
Expand Down Expand Up @@ -1037,6 +1046,9 @@ impl Runtime {
features: Features,
protect: Option<Arc<dyn Protector>>,
) -> Result<Self> {
let event_reporter =
event_reporter::EventReporter::new(libtelio_wide_event_publisher.clone());

let neptun_reset_conns =
features.firewall.neptun_reset_conns || features.firewall.boringtun_reset_conns;

Expand Down Expand Up @@ -1166,7 +1178,7 @@ impl Runtime {
let nurse = if telio_lana::is_lana_initialized() {
if let Some(nurse_features) = &features.nurse {
let nurse_io = NurseIo {
wg_event_channel: &libtelio_wide_event_publisher,
wg_event_channel: libtelio_wide_event_publisher.subscribe(),
wg_analytics_channel: analytics_ch.clone(),
config_update_channel: config_update_ch.clone(),
collection_trigger_channel: collection_trigger_ch.clone(),
Expand Down Expand Up @@ -1207,7 +1219,6 @@ impl Runtime {

let requested_state = RequestedState {
device_config: config.clone(),
keepalive_periods: features.wireguard.persistent_keepalive.clone(),
..Default::default()
};

Expand Down Expand Up @@ -1274,6 +1285,7 @@ impl Runtime {
derp_events_publisher: derp_events.tx,
},
polling_interval,
event_reporter,
#[cfg(test)]
test_env: wg::tests::Env {
analytics: analytics_ch,
Expand Down Expand Up @@ -2370,6 +2382,7 @@ impl TaskRuntime for Runtime {
}
}

// TODO: Events: state change event occured. Need to notify libtelio event handler
let node = self.peer_to_node(&mesh_event.peer, Some(mesh_event.state), mesh_event.link_state).await;

if let Some(node) = node {
Expand All @@ -2385,13 +2398,11 @@ impl TaskRuntime for Runtime {
Ok(())
},

Ok(derp_event) = self.event_listeners.derp_event_subscriber.recv() => {
let event = Event::builder::<DerpServer>().set(*derp_event).build();
if let Some(event) = event {
let _ = self.event_publishers.libtelio_event_publisher.send(
Box::new(event)
);
}
Ok(_) = self.event_listeners.derp_event_subscriber.recv() => {
let res = self.event_reporter.report_events(&self.requested_state, &self.entities).await;
if res.is_err() {
telio_log_error!("Failed to report events to library integrators {:?}", res);
}
Ok(())
},

Expand Down Expand Up @@ -2425,6 +2436,7 @@ impl TaskRuntime for Runtime {

Some(pq_event) = self.event_listeners.post_quantum_subscriber.recv() => {
telio_log_debug!("WG consolidation triggered by PQ event");
//TODO: Events: Potentially PQ state has changed

self.entities.postquantum_wg.on_event(pq_event);

Expand All @@ -2439,6 +2451,7 @@ impl TaskRuntime for Runtime {

_ = self.polling_interval.tick() => {
telio_log_debug!("WG consolidation triggered by tick event, total logs dropped: {}", logs_dropped_until_now());
//TODO: Events: just in case - trigger telio event handling code
let dropped = logs_dropped_since_last_checked();
if dropped > 0 {
telio_log_warn!("New logs dropped: {dropped}");
Expand Down
220 changes: 220 additions & 0 deletions src/device/event_reporter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
use super::{Entities, Error, RequestedState, Result};
use telio_proxy::Proxy;
use std::collections::HashMap;
use std::net::IpAddr;
use std::sync::Arc;
use telio_model::config::{Peer, RelayState, Server as RelayEvent};
use telio_model::constants::{VPN_EXTERNAL_IPV4, VPN_INTERNAL_IPV4};
use telio_model::event::Set;
use telio_model::features::PathType;
use telio_model::mesh::{ExitNode, LinkState};
use telio_model::{event::Event, mesh::Node, EndpointMap, PublicKey};
use telio_pq::Entity as PqEntity;
use telio_relay::DerpRelay;
use telio_task::io::mc_chan::Tx;
use telio_wg::uapi::{Peer as WGPeer, PeerState};
use telio_wg::{DynamicWg, WireGuard};

pub struct EventReporter {
event_publisher: Tx<Box<Event>>,
last_reported_derp_state: Option<RelayEvent>,
last_reported_node_state: HashMap<PublicKey, Node>,
}

impl EventReporter {
pub fn new(event_publisher: Tx<Box<Event>>) -> Self {
return Self {
event_publisher,
last_reported_derp_state: Default::default(),
last_reported_node_state: Default::default(),
};
}

/// Report any state changes to the libtelio library users
///
/// This function will mainly take care of the following uses cases:
/// * Report event for any state change of any VPN/Meshnet Node. VPN nodes may retrieve their
/// statuses from telio-pq as well as telio-wg as sources of truth for current state.
/// * Report DERP connection status changes.
/// * Notify apps about non-recoverable issues, for example in case WireGuard starts spewing
/// errors over UAPI.
///
/// The list above may get extended in the future.
///
/// This module depends on external monitoring of state changes, it will not monitor for it
/// itself. Hence the handle_state_change function should get called whenever any state change
/// occurs.
///
/// Note here we are *not* handling panic's reporting.
pub async fn report_events(
&mut self,
requested_state: &RequestedState,
entities: &Entities,
) -> Result {
// Report any changes to DERP state
self.report_derp_events(entities.meshnet.left().map(|e| &e.derp))
.await?;

// Report any changes to Nodes state, but first - lets acquire some information required to
// do the reporting
let proxy_endpoints = match entities.meshnet.left() {
None => Default::default(),
Some(mesh) => mesh.proxy.get_endpoint_map().await?,
};
let requested_nodes: HashMap<PublicKey, Peer> = requested_state.meshnet_config.as_ref()
.and_then(|mesh| mesh.peers.clone())
.map_or(vec![], |peers| peers)
.iter()
.map(|p| (p.public_key.clone(), p.clone()))
.collect();

self.report_node_events(
&entities.wireguard_interface,
&entities.postquantum_wg,
&requested_state.exit_node,
&proxy_endpoints,
&requested_nodes,
)
.await?;
Ok(())
}

pub async fn report_derp_events(&mut self, relay: Option<&Arc<DerpRelay>>) -> Result {
// Meshnet may be disabled, in which case relay will be None
// Otherwise we may have no server selected
// Or we may be attemting to connect to some server
// Or we may be connected to some server
let current_relay_state = match relay {
Some(r) => r.get_connected_server().await,
_ => None,
};

// Evaluate if anything has changed and build an even if necessary
let event = match (
self.last_reported_derp_state.clone(),
current_relay_state.clone(),
) {
// Nothing Changed
(None, None) => return Ok(()),
(Some(a), Some(b)) if a == b => return Ok(()),

// Either freshly connected or connection info has changed
(None, Some(current_state)) | (Some(_), Some(current_state)) => {
Event::builder::<RelayEvent>()
.set(current_state)
.build()
.ok_or(Error::EventBuildingError)?
}

// Disconnected
(Some(last_reported_derp_state), None) => {
let relay_event = RelayEvent {
conn_state: RelayState::Disconnected,
..last_reported_derp_state
};
Event::builder::<RelayEvent>()
.set(relay_event)
.build()
.ok_or(Error::EventBuildingError)?
}
};

// Fire event
self.event_publisher.send(Box::new(event))?;

// Make sure, that we know what has been reported in the future iterations
self.last_reported_derp_state = current_relay_state;
Ok(())
}

pub async fn report_node_events(
&mut self,
wireguard: &Arc<DynamicWg>,
pq: &PqEntity,
exit_node: &Option<ExitNode>,
proxy_endpoint_map: &EndpointMap,
requested_nodes: &HashMap<PublicKey, Peer>
) -> Result {
// Retreive relevant WireGuard peers
let wg_peers = wireguard
.get_interface()
.await?
.peers;

let nodes = wg_peers
.values()
.into_iter()
.map(|wg_peer| (wg_peer, requested_nodes.get(&wg_peer.public_key), exit_node.clone().filter(|e| e.public_key == wg_peer.public_key)));

// TODO: convert wg_peers to Nodes

// TODO: if pq.is_rotating_keys() is true, but Node is not present in Nodes map -> insert
// one

// TODO: Find all disconnected nodes (present in last_reported_node_state, but not in new
// nodes map). And issue disconnect event for all of them. Delete them from hasmap

// TODO: for the rest of the nodes issue events if node structs are unequal

Ok(())
}

fn merge_wg_peer_and_requested_node_info(
peer: &WGPeer,
requested_peer: &Option<Peer>,
state: PeerState,
link_state: Option<LinkState>,
exit_node: &Option<ExitNode>,
proxy_endpoint_map: &EndpointMap,
) -> Result<Node> {
// Make sure that, whenever provided - requested_peer and/or exit_node refers to the same
// peer (identified by public key) as the WG information is coming from
if requested_peer.as_ref().map_or(true, |rn| rn.public_key == peer.public_key)
&& exit_node.as_ref().map_or(true, |en| en.public_key == peer.public_key) {
return Err(Error::EventReportingError(String::from("Event information merging attempted on mismatched peers/nodes/exit_nodes")));
}

// Make sure that we are merging information for peers which has been requested by
// integrators. This means that either requested_peer or exit_node must be Some(_).
let cfg_id = match (&requested_peer, &exit_node) {
(Some(requested_peer), _) => requested_peer.base.identifier.clone(),
(_, Some(exit_node)) => exit_node.identifier.clone(),
(None, None) => {
return Err(Error::EventReportingError(String::from("Event information merging attempted on internal peer")));
}
};

// Determine path type according to proxy endpoint map
let path = proxy_endpoint_map
.get(&peer.public_key)
.and_then(|proxy| peer.endpoint.filter(|endpoint| proxy.contains(endpoint)))
.map_or(PathType::Direct, |_| PathType::Relay);

// Determine IP addresses assigned to the peer
let ip_addresses = match (requested_peer, exit_node) {
(Some(requested_node), _) => requested_node.ip_addresses.clone().unwrap_or_default(),
(_, Some(_)) => vec![IpAddr::V4(VPN_EXTERNAL_IPV4), IpAddr::V4(VPN_INTERNAL_IPV4)],
(None, None) => vec!{}
};

// Build node struct out of all the information we have
Ok(Node {
identifier: cfg_id,
public_key: peer.public_key,
nickname: requested_peer.as_ref().map(|p| p.base.nickname.clone()).flatten().map(|n| n.0),
state,
link_state,
is_exit: exit_node.is_some(),
is_vpn: (exit_node.is_some() && requested_peer.is_none()),
ip_addresses,
allowed_ips: peer.allowed_ips.clone(),
endpoint: peer.endpoint,
hostname: requested_peer.as_ref().map(|p| p.base.hostname.0.clone()),
allow_incoming_connections: requested_peer.as_ref().map(|n| n.allow_incoming_connections).unwrap_or_default(),
allow_peer_send_files: requested_peer.as_ref().map(|n| n.allow_peer_send_files).unwrap_or_default(),
path,
allow_multicast: requested_peer.as_ref().map(|n| n.allow_multicast).unwrap_or_default(),
peer_allows_multicast: requested_peer.as_ref().map(|n| n.peer_allows_multicast).unwrap_or_default(),
})
}
}
Loading
Loading