Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Emit 'connecting' event from PQ handshake #1006

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .unreleased/LLT-5814
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Emit 'CONNECTING' node event when starting the PQ handshake
20 changes: 17 additions & 3 deletions crates/telio-pq/src/entity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use telio_task::io::chan;
use telio_utils::telio_log_debug;

struct Peer {
pubkey: telio_crypto::PublicKey,
addr: SocketAddr,
/// This is a key rotation task guard, its `Drop` implementation aborts the task
_rotation_task: super::conn::ConnKeyRotation,
Expand Down Expand Up @@ -71,20 +72,30 @@ impl Entity {
}
}
}
_ => (),
}
}

pub fn stop(&mut self) {
self.peer = None;
pub async fn stop(&mut self) {
if let Some(peer) = self.peer.take() {
#[allow(mpsc_blocking_send)]
let _ = self
.chan
.send(super::Event::Disconnected(peer.pubkey))
.await;
}
}

pub fn start(
pub async fn start(
&mut self,
addr: SocketAddr,
wg_secret: telio_crypto::SecretKey,
peer: telio_crypto::PublicKey,
) {
self.stop().await;

self.peer = Some(Peer {
pubkey: peer,
addr,
_rotation_task: super::conn::ConnKeyRotation::run(
self.chan.clone(),
Expand All @@ -96,5 +107,8 @@ impl Entity {
),
keys: None,
});

#[allow(mpsc_blocking_send)]
let _ = self.chan.send(super::Event::Connecting(peer)).await;
}
}
2 changes: 2 additions & 0 deletions crates/telio-pq/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ pub trait PostQuantum {

#[derive(Clone)]
pub enum Event {
Connecting(telio_crypto::PublicKey),
Disconnected(telio_crypto::PublicKey),
Handshake(SocketAddr, Keys),
Rekey(Keys),
}
Expand Down
41 changes: 34 additions & 7 deletions nat-lab/tests/test_pq.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from helpers import SetupParameters, setup_environment
from telio import Client
from utils import stun
from utils.bindings import TelioAdapterType
from utils.bindings import TelioAdapterType, NodeState, PathType
from utils.connection import Connection
from utils.connection_util import (
generate_connection_tracker_config,
Expand All @@ -28,6 +28,7 @@ async def _connect_vpn_pq(
int(wg_server["port"]),
str(wg_server["public_key"]),
pq=True,
timeout=10,
)

await ping(client_conn, config.PHOTO_ALBUM_IP)
Expand Down Expand Up @@ -410,18 +411,44 @@ async def test_pq_vpn_silent_pq_upgrader(
client, *_ = env.clients

wg_server = config.WG_SERVER # use non PQ server

mathiaspeters marked this conversation as resolved.
Show resolved Hide resolved
ip = str(wg_server["ipv4"])
pubkey = str(wg_server["public_key"])
port = int(wg_server["port"])

await client.restart_interface()
await client.get_router().create_vpn_route()
client.get_runtime().allowed_pub_keys.add(pubkey)

await client.get_proxy().connect_to_exit_node_pq(
public_key=pubkey,
allowed_ips=None,
endpoint=f"{ip}:{port}",
)

await client.wait_for_state_peer(
pubkey,
[NodeState.CONNECTING],
list(PathType),
is_exit=True,
is_vpn=True,
timeout=1,
)

try:
await client.connect_to_vpn(
str(wg_server["ipv4"]),
int(wg_server["port"]),
str(wg_server["public_key"]),
pq=True,
timeout=4,
await client.wait_for_state_peer(
pubkey,
[NodeState.CONNECTED],
list(PathType),
is_exit=True,
is_vpn=True,
timeout=3,
)
raise Exception("This shouldn't connect succesfully")
except TimeoutError:
pass

await client.disconnect_from_vpn(pubkey, timeout=4)
await client.get_router().delete_vpn_route()

# now connect to a good behaving PQ server
Expand Down
115 changes: 89 additions & 26 deletions src/device.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ use telio_utils::{

use telio_model::{
config::{Config, Peer, PeerBase, Server as DerpServer},
constants::{VPN_EXTERNAL_IPV4, VPN_INTERNAL_IPV4},
event::{Event, Set},
features::{FeaturePersistentKeepalive, Features, PathType},
mesh::{ExitNode, LinkState, Node},
Expand Down Expand Up @@ -452,6 +453,11 @@ struct Runtime {
/// Some of the events are time based, so just poll the whole state from time to time
polling_interval: Interval,

/// Store most recent reported event to the apps per node
///
/// TODO: This is planned to be refactored into a bit better solution in https://github.com/NordSecurity/libtelio/pull/1021
last_transmitted_event: HashMap<PublicKey, Node>,

#[cfg(test)]
/// MockedAdapter (tests)
test_env: telio_wg::tests::Env,
Expand Down Expand Up @@ -1274,6 +1280,7 @@ impl Runtime {
derp_events_publisher: derp_events.tx,
},
polling_interval,
last_transmitted_event: Default::default(),
#[cfg(test)]
test_env: wg::tests::Env {
analytics: analytics_ch,
Expand Down Expand Up @@ -1990,7 +1997,7 @@ impl Runtime {

if res.is_err() {
// Stop PQ task
self.entities.postquantum_wg.stop();
self.entities.postquantum_wg.stop().await;
}

res
Expand All @@ -2009,14 +2016,17 @@ impl Runtime {
let exit_node = exit_node.clone();

// Stop post quantum key rotation task if it's running
self.entities.postquantum_wg.stop();
self.entities.postquantum_wg.stop().await;

if postquantum {
self.entities.postquantum_wg.start(
exit_node.endpoint.ok_or(Error::EndpointNotProvided)?,
self.requested_state.device_config.private_key.clone(),
exit_node.public_key,
);
self.entities
.postquantum_wg
.start(
exit_node.endpoint.ok_or(Error::EndpointNotProvided)?,
self.requested_state.device_config.private_key.clone(),
exit_node.public_key,
)
.await;
}

// dns socket for macos should only be bound to tunnel interface when connected to exit,
Expand Down Expand Up @@ -2125,7 +2135,7 @@ impl Runtime {
.await?;
}

self.entities.postquantum_wg.stop();
self.entities.postquantum_wg.stop().await;

if let Some(pmtud) = &mut self.entities.pmtu_detection {
pmtud.stop().await;
Expand Down Expand Up @@ -2230,25 +2240,11 @@ impl Runtime {
(None, Some(exit_node)) => {
// Exit node
Some(Node {
identifier: exit_node.identifier.clone(),
public_key: exit_node.public_key,
nickname: None,
state: state.unwrap_or_else(|| peer.state()),
link_state,
is_exit: true,
is_vpn: exit_node.endpoint.is_some(),
ip_addresses: vec![
IpAddr::V4(Ipv4Addr::new(10, 5, 0, 1)),
IpAddr::V4(Ipv4Addr::new(100, 64, 0, 1)),
],
allowed_ips: peer.allowed_ips.clone(),
endpoint,
hostname: None,
allow_incoming_connections: false,
allow_peer_send_files: false,
path: path_type,
allow_multicast: false,
peer_allows_multicast: false,
..node_from_exit_node(exit_node)
})
}
_ => None,
Expand Down Expand Up @@ -2312,6 +2308,21 @@ impl Runtime {
}
}
}

fn is_dublicated_event(&mut self, node: &Node) -> bool {
LukasPukenis marked this conversation as resolved.
Show resolved Hide resolved
match self.last_transmitted_event.get(&node.public_key) {
None => node.state == PeerState::Disconnected,
Some(last_node) => *node == *last_node,
}
}

fn remember_last_transmitted_node_event(&mut self, node: Node) {
if node.state == PeerState::Disconnected {
self.last_transmitted_event.remove(&node.public_key);
} else {
self.last_transmitted_event.insert(node.public_key, node);
}
}
}

#[async_trait]
Expand Down Expand Up @@ -2374,11 +2385,11 @@ impl TaskRuntime for Runtime {

if let Some(node) = node {
// Publish WG event to app
let event = Event::builder::<Node>().set(node).build();
if let Some(event) = event {
if !self.is_dublicated_event(&node) {
let _ = self.event_publishers.libtelio_event_publisher.send(
Box::new(event)
Box::new(Event::Node {body: node.clone()})
);
self.remember_last_transmitted_node_event(node);
}
}

Expand Down Expand Up @@ -2426,6 +2437,38 @@ impl TaskRuntime for Runtime {
Some(pq_event) = self.event_listeners.post_quantum_subscriber.recv() => {
telio_log_debug!("WG consolidation triggered by PQ event");

match (&pq_event, &self.requested_state.exit_node, &self.requested_state.last_exit_node) {
(telio_pq::Event::Connecting(pubkey), Some(exit), _) if exit.public_key == *pubkey => {
let node = Node {
state: PeerState::Connecting,
link_state: if self.features.link_detection.is_some() {Some(LinkState::Down)} else {None},
Jauler marked this conversation as resolved.
Show resolved Hide resolved
..node_from_exit_node(exit)
};

if !self.is_dublicated_event(&node) {
let _ = self.event_publishers.libtelio_event_publisher.send(
Box::new(Event::Node {body: node.clone()})
);
self.remember_last_transmitted_node_event(node);
}
},
(telio_pq::Event::Disconnected(pubkey), _, Some(last_exit)) if last_exit.public_key == *pubkey => {
let node = Node {
state: PeerState::Disconnected,
link_state: if self.features.link_detection.is_some() {Some(LinkState::Down)} else {None},
Jauler marked this conversation as resolved.
Show resolved Hide resolved
..node_from_exit_node(last_exit)
};

if !self.is_dublicated_event(&node) {
let _ = self.event_publishers.libtelio_event_publisher.send(
Box::new(Event::Node {body: node.clone()})
);
self.remember_last_transmitted_node_event(node);
}
},
_ => (),
}

self.entities.postquantum_wg.on_event(pq_event);

if let Err(err) = wg_controller::consolidate_wg_state(&self.requested_state, &self.entities, &self.features)
Expand Down Expand Up @@ -2542,6 +2585,26 @@ fn set_tunnel_interface(socket_pool: &Arc<SocketPool>, config: &DeviceConfig) {
}
}

fn node_from_exit_node(exit_node: &ExitNode) -> Node {
Node {
identifier: exit_node.identifier.clone(),
public_key: exit_node.public_key,
nickname: None,
is_exit: true,
is_vpn: exit_node.endpoint.is_some(),
ip_addresses: vec![IpAddr::V4(VPN_EXTERNAL_IPV4), IpAddr::V4(VPN_INTERNAL_IPV4)],
allowed_ips: exit_node.allowed_ips.clone().unwrap_or_default(),
endpoint: exit_node.endpoint,
hostname: None,
allow_incoming_connections: false,
allow_peer_send_files: false,
path: PathType::Direct,
allow_multicast: false,
peer_allows_multicast: false,
..Default::default()
}
}

matszczygiel marked this conversation as resolved.
Show resolved Hide resolved
#[cfg(test)]
mod tests {
use super::*;
Expand Down
Loading