diff --git a/.unreleased/LLT-5814 b/.unreleased/LLT-5814 new file mode 100644 index 000000000..11061f703 --- /dev/null +++ b/.unreleased/LLT-5814 @@ -0,0 +1 @@ +Emit 'CONNECTING' node event when starting the PQ handshake diff --git a/crates/telio-pq/src/entity.rs b/crates/telio-pq/src/entity.rs index 254b5c721..d9e07290b 100644 --- a/crates/telio-pq/src/entity.rs +++ b/crates/telio-pq/src/entity.rs @@ -6,6 +6,7 @@ use telio_task::io::chan; use telio_utils::telio_log_debug; struct Peer { + pubkey: telio_crypto::PublicKey, addr: SocketAddr, /// This is a key rotation task guard, its `Drop` implementation aborts the task _rotation_task: super::conn::ConnKeyRotation, @@ -71,20 +72,30 @@ impl Entity { } } } + _ => (), } } - pub fn stop(&mut self) { - self.peer = None; + pub async fn stop(&mut self) { + if let Some(peer) = self.peer.take() { + #[allow(mpsc_blocking_send)] + let _ = self + .chan + .send(super::Event::Disconnected(peer.pubkey)) + .await; + } } - pub fn start( + pub async fn start( &mut self, addr: SocketAddr, wg_secret: telio_crypto::SecretKey, peer: telio_crypto::PublicKey, ) { + self.stop().await; + self.peer = Some(Peer { + pubkey: peer, addr, _rotation_task: super::conn::ConnKeyRotation::run( self.chan.clone(), @@ -96,5 +107,8 @@ impl Entity { ), keys: None, }); + + #[allow(mpsc_blocking_send)] + let _ = self.chan.send(super::Event::Connecting(peer)).await; } } diff --git a/crates/telio-pq/src/lib.rs b/crates/telio-pq/src/lib.rs index 033006a83..d23cbecfc 100644 --- a/crates/telio-pq/src/lib.rs +++ b/crates/telio-pq/src/lib.rs @@ -24,6 +24,8 @@ pub trait PostQuantum { #[derive(Clone)] pub enum Event { + Connecting(telio_crypto::PublicKey), + Disconnected(telio_crypto::PublicKey), Handshake(SocketAddr, Keys), Rekey(Keys), } diff --git a/nat-lab/tests/test_pq.py b/nat-lab/tests/test_pq.py index cf12424ac..525fa4558 100644 --- a/nat-lab/tests/test_pq.py +++ b/nat-lab/tests/test_pq.py @@ -4,7 +4,7 @@ from helpers import SetupParameters, setup_environment from telio import Client from utils import stun -from utils.bindings import TelioAdapterType +from utils.bindings import TelioAdapterType, NodeState, PathType from utils.connection import Connection from utils.connection_util import ( generate_connection_tracker_config, @@ -28,6 +28,7 @@ async def _connect_vpn_pq( int(wg_server["port"]), str(wg_server["public_key"]), pq=True, + timeout=10, ) await ping(client_conn, config.PHOTO_ALBUM_IP) @@ -410,18 +411,44 @@ async def test_pq_vpn_silent_pq_upgrader( client, *_ = env.clients wg_server = config.WG_SERVER # use non PQ server + + ip = str(wg_server["ipv4"]) + pubkey = str(wg_server["public_key"]) + port = int(wg_server["port"]) + + await client.restart_interface() + await client.get_router().create_vpn_route() + client.get_runtime().allowed_pub_keys.add(pubkey) + + await client.get_proxy().connect_to_exit_node_pq( + public_key=pubkey, + allowed_ips=None, + endpoint=f"{ip}:{port}", + ) + + await client.wait_for_state_peer( + pubkey, + [NodeState.CONNECTING], + list(PathType), + is_exit=True, + is_vpn=True, + timeout=1, + ) + try: - await client.connect_to_vpn( - str(wg_server["ipv4"]), - int(wg_server["port"]), - str(wg_server["public_key"]), - pq=True, - timeout=4, + await client.wait_for_state_peer( + pubkey, + [NodeState.CONNECTED], + list(PathType), + is_exit=True, + is_vpn=True, + timeout=3, ) raise Exception("This shouldn't connect succesfully") except TimeoutError: pass + await client.disconnect_from_vpn(pubkey, timeout=4) await client.get_router().delete_vpn_route() # now connect to a good behaving PQ server diff --git a/src/device.rs b/src/device.rs index d3a608467..aced51305 100644 --- a/src/device.rs +++ b/src/device.rs @@ -81,6 +81,7 @@ use telio_utils::{ use telio_model::{ config::{Config, Peer, PeerBase, Server as DerpServer}, + constants::{VPN_EXTERNAL_IPV4, VPN_INTERNAL_IPV4}, event::{Event, Set}, features::{FeaturePersistentKeepalive, Features, PathType}, mesh::{ExitNode, LinkState, Node}, @@ -452,6 +453,11 @@ struct Runtime { /// Some of the events are time based, so just poll the whole state from time to time polling_interval: Interval, + /// Store most recent reported event to the apps per node + /// + /// TODO: This is planned to be refactored into a bit better solution in https://github.com/NordSecurity/libtelio/pull/1021 + last_transmitted_event: HashMap, + #[cfg(test)] /// MockedAdapter (tests) test_env: telio_wg::tests::Env, @@ -1274,6 +1280,7 @@ impl Runtime { derp_events_publisher: derp_events.tx, }, polling_interval, + last_transmitted_event: Default::default(), #[cfg(test)] test_env: wg::tests::Env { analytics: analytics_ch, @@ -1990,7 +1997,7 @@ impl Runtime { if res.is_err() { // Stop PQ task - self.entities.postquantum_wg.stop(); + self.entities.postquantum_wg.stop().await; } res @@ -2009,14 +2016,17 @@ impl Runtime { let exit_node = exit_node.clone(); // Stop post quantum key rotation task if it's running - self.entities.postquantum_wg.stop(); + self.entities.postquantum_wg.stop().await; if postquantum { - self.entities.postquantum_wg.start( - exit_node.endpoint.ok_or(Error::EndpointNotProvided)?, - self.requested_state.device_config.private_key.clone(), - exit_node.public_key, - ); + self.entities + .postquantum_wg + .start( + exit_node.endpoint.ok_or(Error::EndpointNotProvided)?, + self.requested_state.device_config.private_key.clone(), + exit_node.public_key, + ) + .await; } // dns socket for macos should only be bound to tunnel interface when connected to exit, @@ -2125,7 +2135,7 @@ impl Runtime { .await?; } - self.entities.postquantum_wg.stop(); + self.entities.postquantum_wg.stop().await; if let Some(pmtud) = &mut self.entities.pmtu_detection { pmtud.stop().await; @@ -2230,25 +2240,11 @@ impl Runtime { (None, Some(exit_node)) => { // Exit node Some(Node { - identifier: exit_node.identifier.clone(), - public_key: exit_node.public_key, - nickname: None, state: state.unwrap_or_else(|| peer.state()), link_state, - is_exit: true, - is_vpn: exit_node.endpoint.is_some(), - ip_addresses: vec![ - IpAddr::V4(Ipv4Addr::new(10, 5, 0, 1)), - IpAddr::V4(Ipv4Addr::new(100, 64, 0, 1)), - ], allowed_ips: peer.allowed_ips.clone(), - endpoint, - hostname: None, - allow_incoming_connections: false, - allow_peer_send_files: false, path: path_type, - allow_multicast: false, - peer_allows_multicast: false, + ..node_from_exit_node(exit_node) }) } _ => None, @@ -2312,6 +2308,21 @@ impl Runtime { } } } + + fn is_dublicated_event(&mut self, node: &Node) -> bool { + match self.last_transmitted_event.get(&node.public_key) { + None => node.state == PeerState::Disconnected, + Some(last_node) => *node == *last_node, + } + } + + fn remember_last_transmitted_node_event(&mut self, node: Node) { + if node.state == PeerState::Disconnected { + self.last_transmitted_event.remove(&node.public_key); + } else { + self.last_transmitted_event.insert(node.public_key, node); + } + } } #[async_trait] @@ -2374,11 +2385,11 @@ impl TaskRuntime for Runtime { if let Some(node) = node { // Publish WG event to app - let event = Event::builder::().set(node).build(); - if let Some(event) = event { + if !self.is_dublicated_event(&node) { let _ = self.event_publishers.libtelio_event_publisher.send( - Box::new(event) + Box::new(Event::Node {body: node.clone()}) ); + self.remember_last_transmitted_node_event(node); } } @@ -2426,6 +2437,38 @@ impl TaskRuntime for Runtime { Some(pq_event) = self.event_listeners.post_quantum_subscriber.recv() => { telio_log_debug!("WG consolidation triggered by PQ event"); + match (&pq_event, &self.requested_state.exit_node, &self.requested_state.last_exit_node) { + (telio_pq::Event::Connecting(pubkey), Some(exit), _) if exit.public_key == *pubkey => { + let node = Node { + state: PeerState::Connecting, + link_state: if self.features.link_detection.is_some() {Some(LinkState::Down)} else {None}, + ..node_from_exit_node(exit) + }; + + if !self.is_dublicated_event(&node) { + let _ = self.event_publishers.libtelio_event_publisher.send( + Box::new(Event::Node {body: node.clone()}) + ); + self.remember_last_transmitted_node_event(node); + } + }, + (telio_pq::Event::Disconnected(pubkey), _, Some(last_exit)) if last_exit.public_key == *pubkey => { + let node = Node { + state: PeerState::Disconnected, + link_state: if self.features.link_detection.is_some() {Some(LinkState::Down)} else {None}, + ..node_from_exit_node(last_exit) + }; + + if !self.is_dublicated_event(&node) { + let _ = self.event_publishers.libtelio_event_publisher.send( + Box::new(Event::Node {body: node.clone()}) + ); + self.remember_last_transmitted_node_event(node); + } + }, + _ => (), + } + self.entities.postquantum_wg.on_event(pq_event); if let Err(err) = wg_controller::consolidate_wg_state(&self.requested_state, &self.entities, &self.features) @@ -2542,6 +2585,26 @@ fn set_tunnel_interface(socket_pool: &Arc, config: &DeviceConfig) { } } +fn node_from_exit_node(exit_node: &ExitNode) -> Node { + Node { + identifier: exit_node.identifier.clone(), + public_key: exit_node.public_key, + nickname: None, + is_exit: true, + is_vpn: exit_node.endpoint.is_some(), + ip_addresses: vec![IpAddr::V4(VPN_EXTERNAL_IPV4), IpAddr::V4(VPN_INTERNAL_IPV4)], + allowed_ips: exit_node.allowed_ips.clone().unwrap_or_default(), + endpoint: exit_node.endpoint, + hostname: None, + allow_incoming_connections: false, + allow_peer_send_files: false, + path: PathType::Direct, + allow_multicast: false, + peer_allows_multicast: false, + ..Default::default() + } +} + #[cfg(test)] mod tests { use super::*;