Skip to content

Commit

Permalink
Emit 'connecting' event from PQ handshake
Browse files Browse the repository at this point in the history
Signed-off-by: Mateusz Szczygieł <[email protected]>
  • Loading branch information
matszczygiel committed Dec 3, 2024
1 parent 95db1de commit a7d82ca
Show file tree
Hide file tree
Showing 5 changed files with 112 additions and 33 deletions.
1 change: 1 addition & 0 deletions .unreleased/LLT-5814
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Emit 'CONNECTING' node event when starting the PQ handshake
18 changes: 15 additions & 3 deletions crates/telio-pq/src/entity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use telio_task::io::chan;
use telio_utils::telio_log_debug;

struct Peer {
pubkey: telio_crypto::PublicKey,
addr: SocketAddr,
/// This is a key rotation task guard, its `Drop` implementation aborts the task
_rotation_task: super::conn::ConnKeyRotation,
Expand Down Expand Up @@ -71,20 +72,29 @@ impl Entity {
}
}
}
_ => (),
}
}

pub fn stop(&mut self) {
self.peer = None;
pub async fn stop(&mut self) {
if let Some(peer) = self.peer.take() {
let _ = self
.chan
.send(super::Event::Disconnected(peer.pubkey))
.await;
}
}

pub fn start(
pub async fn start(
&mut self,
addr: SocketAddr,
wg_secret: telio_crypto::SecretKey,
peer: telio_crypto::PublicKey,
) {
self.stop().await;

self.peer = Some(Peer {
pubkey: peer,
addr,
_rotation_task: super::conn::ConnKeyRotation::run(
self.chan.clone(),
Expand All @@ -96,5 +106,7 @@ impl Entity {
),
keys: None,
});

let _ = self.chan.send(super::Event::Connecting(peer)).await;
}
}
2 changes: 2 additions & 0 deletions crates/telio-pq/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ pub trait PostQuantum {

#[derive(Copy, Clone)]
pub enum Event {
Connecting(telio_crypto::PublicKey),
Disconnected(telio_crypto::PublicKey),
Handshake(SocketAddr, Keys),
Rekey(Keys),
}
Expand Down
42 changes: 35 additions & 7 deletions nat-lab/tests/test_pq.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from helpers import SetupParameters, setup_environment
from telio import Client
from utils import stun
from utils.bindings import TelioAdapterType
from utils.bindings import TelioAdapterType, NodeState, PathType
from utils.connection import Connection
from utils.connection_util import (
generate_connection_tracker_config,
Expand All @@ -13,6 +13,7 @@
)
from utils.dns import query_dns
from utils.ping import ping
from utils import asyncio_util

EMPTY_PRESHARED_KEY_SLOT = "(none)"

Expand All @@ -28,6 +29,7 @@ async def _connect_vpn_pq(
int(wg_server["port"]),
str(wg_server["public_key"]),
pq=True,
timeout=10,
)

await ping(client_conn, config.PHOTO_ALBUM_IP)
Expand Down Expand Up @@ -410,18 +412,44 @@ async def test_pq_vpn_silent_pq_upgrader(
client, *_ = env.clients

wg_server = config.WG_SERVER # use non PQ server

ip = str(wg_server["ipv4"])
pubkey = str(wg_server["public_key"])
port = int(wg_server["port"])

await client.restart_interface()
await client.get_router().create_vpn_route()
client.get_runtime().allowed_pub_keys.add(pubkey)

await client.get_proxy().connect_to_exit_node_pq(
public_key=pubkey,
allowed_ips=None,
endpoint=f"{ip}:{port}",
)

await client.wait_for_state_peer(
pubkey,
[NodeState.CONNECTING],
list(PathType),
is_exit=True,
is_vpn=True,
timeout=1,
)

try:
await client.connect_to_vpn(
str(wg_server["ipv4"]),
int(wg_server["port"]),
str(wg_server["public_key"]),
pq=True,
timeout=4,
await client.wait_for_state_peer(
pubkey,
[NodeState.CONNECTED],
list(PathType),
is_exit=True,
is_vpn=True,
timeout=3,
)
raise Exception("This shouldn't connect succesfully")
except TimeoutError:
pass

await client.disconnect_from_vpn(pubkey, timeout=4)
await client.get_router().delete_vpn_route()

# now connect to a good behaving PQ server
Expand Down
82 changes: 59 additions & 23 deletions src/device.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ use telio_utils::{

use telio_model::{
config::{Config, Peer, PeerBase, Server as DerpServer},
constants::{VPN_EXTERNAL_IPV4, VPN_INTERNAL_IPV4},
event::{Event, Set},
features::{FeaturePersistentKeepalive, Features, PathType},
mesh::{ExitNode, LinkState, Node},
Expand Down Expand Up @@ -2011,7 +2012,7 @@ impl Runtime {

if res.is_err() {
// Stop PQ task
self.entities.postquantum_wg.stop();
self.entities.postquantum_wg.stop().await;
}

res
Expand All @@ -2030,14 +2031,17 @@ impl Runtime {
let exit_node = exit_node.clone();

// Stop post quantum key rotation task if it's running
self.entities.postquantum_wg.stop();
self.entities.postquantum_wg.stop().await;

if postquantum {
self.entities.postquantum_wg.start(
exit_node.endpoint.ok_or(Error::EndpointNotProvided)?,
self.requested_state.device_config.private_key,
exit_node.public_key,
);
self.entities
.postquantum_wg
.start(
exit_node.endpoint.ok_or(Error::EndpointNotProvided)?,
self.requested_state.device_config.private_key,
exit_node.public_key,
)
.await;
}

// dns socket for macos should only be bound to tunnel interface when connected to exit,
Expand Down Expand Up @@ -2146,7 +2150,7 @@ impl Runtime {
.await?;
}

self.entities.postquantum_wg.stop();
self.entities.postquantum_wg.stop().await;

if let Some(pmtud) = &mut self.entities.pmtu_detection {
pmtud.stop().await;
Expand Down Expand Up @@ -2251,25 +2255,11 @@ impl Runtime {
(None, Some(exit_node)) => {
// Exit node
Some(Node {
identifier: exit_node.identifier.clone(),
public_key: exit_node.public_key,
nickname: None,
state: state.unwrap_or_else(|| peer.state()),
link_state,
is_exit: true,
is_vpn: exit_node.endpoint.is_some(),
ip_addresses: vec![
IpAddr::V4(Ipv4Addr::new(10, 5, 0, 1)),
IpAddr::V4(Ipv4Addr::new(100, 64, 0, 1)),
],
allowed_ips: peer.allowed_ips.clone(),
endpoint,
hostname: None,
allow_incoming_connections: false,
allow_peer_send_files: false,
path: path_type,
allow_multicast: false,
peer_allows_multicast: false,
..node_from_exit_node(exit_node)
})
}
_ => None,
Expand Down Expand Up @@ -2447,6 +2437,32 @@ impl TaskRuntime for Runtime {
Some(pq_event) = self.event_listeners.post_quantum_subscriber.recv() => {
telio_log_debug!("WG consolidation triggered by PQ event");

match (&pq_event, &self.requested_state.exit_node, &self.requested_state.last_exit_node) {
(telio_pq::Event::Connecting(pubkey), Some(exit), _) if exit.public_key == *pubkey => {
let body = Node {
state: PeerState::Connecting,
link_state: Some(LinkState::Down),
..node_from_exit_node(exit)
};

let _ = self.event_publishers.libtelio_event_publisher.send(
Box::new(Event::Node { body })
);
},
(telio_pq::Event::Disconnected(pubkey), _, Some(last_exit)) if last_exit.public_key == *pubkey => {
let body = Node {
state: PeerState::Disconnected,
link_state: Some(LinkState::Down),
..node_from_exit_node(last_exit)
};

let _ = self.event_publishers.libtelio_event_publisher.send(
Box::new(Event::Node { body })
);
},
_ => (),
}

self.entities.postquantum_wg.on_event(pq_event);

if let Err(err) = wg_controller::consolidate_wg_state(&self.requested_state, &self.entities, &self.features)
Expand Down Expand Up @@ -2563,6 +2579,26 @@ fn set_tunnel_interface(socket_pool: &Arc<SocketPool>, config: &DeviceConfig) {
}
}

fn node_from_exit_node(exit_node: &ExitNode) -> Node {
Node {
identifier: exit_node.identifier.clone(),
public_key: exit_node.public_key,
nickname: None,
is_exit: true,
is_vpn: exit_node.endpoint.is_some(),
ip_addresses: vec![IpAddr::V4(VPN_EXTERNAL_IPV4), IpAddr::V4(VPN_INTERNAL_IPV4)],
allowed_ips: exit_node.allowed_ips.clone().unwrap_or_default(),
endpoint: exit_node.endpoint,
hostname: None,
allow_incoming_connections: false,
allow_peer_send_files: false,
path: PathType::Direct,
allow_multicast: false,
peer_allows_multicast: false,
..Default::default()
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down

0 comments on commit a7d82ca

Please sign in to comment.