From 84b7791941790fa1ec0ceb2fac8616e7d92f9d4a Mon Sep 17 00:00:00 2001 From: Daniel Fetti Date: Sun, 17 Nov 2024 18:54:58 +0200 Subject: [PATCH] Add DERP userspace keepalives Signed-off-by: Daniel Fetti --- crates/telio-model/src/features.rs | 4 + crates/telio-relay/src/derp.rs | 9 +- crates/telio-relay/src/derp/http.rs | 166 +++++++++++++++++++++------ crates/telio-relay/src/derp/proto.rs | 3 + nat-lab/tests/test_derp_connect.py | 107 ++++++++++++++--- src/libtelio.udl | 2 + 6 files changed, 243 insertions(+), 48 deletions(-) diff --git a/crates/telio-model/src/features.rs b/crates/telio-model/src/features.rs index 23c023810..c301792c3 100644 --- a/crates/telio-model/src/features.rs +++ b/crates/telio-model/src/features.rs @@ -326,6 +326,8 @@ pub struct FeatureDerp { pub tcp_keepalive: Option, /// Derp will send empty messages after this many seconds of not sending/receiving any data [default 60s] pub derp_keepalive: Option, + /// Poll Keepalive: Application level keepalives meant to replace the TCP keepalives + pub poll_keepalive: Option, /// Enable polling of remote peer states to reduce derp traffic pub enable_polling: Option, /// Use Mozilla's root certificates instead of OS ones [default false] @@ -532,6 +534,7 @@ mod tests { "derp": { "tcp_keepalive": 13, "derp_keepalive": 14, + "poll_keepalive": 15, "enable_polling": true, "use_built_in_root_certificates": true }, @@ -621,6 +624,7 @@ mod tests { derp: Some(FeatureDerp { tcp_keepalive: Some(13), derp_keepalive: Some(14), + poll_keepalive: Some(15), enable_polling: Some(true), use_built_in_root_certificates: true, }), diff --git a/crates/telio-relay/src/derp.rs b/crates/telio-relay/src/derp.rs index e55dd4467..1c48613f5 100644 --- a/crates/telio-relay/src/derp.rs +++ b/crates/telio-relay/src/derp.rs @@ -127,16 +127,19 @@ struct State { /// Keepalive values that help keeping Derp connection in conntrack alive, /// so server can send traffic after being silent for a while /// *derp_keepalive* is also used as an interval for retrieving remote peer states. +/// TODO: Update comments for Poll Keepalives #[derive(Debug, Clone, PartialEq, Eq)] pub struct DerpKeepaliveConfig { tcp_keepalive: u32, derp_keepalive: u32, + poll_keepalive: Option, } impl From<&Option> for DerpKeepaliveConfig { fn from(derp: &Option) -> Self { let mut tcp_keepalive = proto::DERP_TCP_KEEPALIVE_INTERVAL; let mut derp_keepalive = proto::DERP_KEEPALIVE_INTERVAL; + let mut poll_keepalive = None; if let Some(derp) = derp { if let Some(tcp_ka) = derp.tcp_keepalive { tcp_keepalive = tcp_ka; @@ -144,11 +147,13 @@ impl From<&Option> for DerpKeepaliveConfig { if let Some(derp_ka) = derp.derp_keepalive { derp_keepalive = derp_ka; } + poll_keepalive = derp.poll_keepalive; } DerpKeepaliveConfig { tcp_keepalive, derp_keepalive, + poll_keepalive, } } } @@ -156,6 +161,7 @@ impl From<&Option> for DerpKeepaliveConfig { const DEFAULT_SERVER_KEEPALIVE_CONFIG: DerpKeepaliveConfig = DerpKeepaliveConfig { tcp_keepalive: proto::DERP_TCP_KEEPALIVE_INTERVAL, derp_keepalive: proto::DERP_KEEPALIVE_INTERVAL, + poll_keepalive: None, }; /// Derp configuration @@ -725,8 +731,9 @@ impl Runtime for State { }, // On tick send derp poll request to derp stream Some((permit, _)) = wait_for_tx(&c.comms_direct.tx, poll_timer_tick) => { - if config.enable_polling { + if config.enable_polling || config.server_keepalives.poll_keepalive.is_some() { self.derp_poll_session = self.derp_poll_session.wrapping_add(1); + telio_log_debug!("Sending DerpPollRequest with session {}", self.derp_poll_session); Self::handle_outcoming_payload_direct(permit, PacketControl::DerpPollRequest(DerpPollRequestMsg::new( self.derp_poll_session, &config.meshnet_peers ))).await; diff --git a/crates/telio-relay/src/derp/http.rs b/crates/telio-relay/src/derp/http.rs index fc32afc82..6c846b5fa 100644 --- a/crates/telio-relay/src/derp/http.rs +++ b/crates/telio-relay/src/derp/http.rs @@ -14,7 +14,7 @@ use std::{ }; use telio_sockets::{SocketBufSizes, SocketPool, TcpParams}; use telio_task::io::Chan; -use telio_utils::interval_at; +use telio_utils::{interval_at, telio_log_debug}; use webpki_roots::TLS_SERVER_ROOTS; use crate::{Config, DerpKeepaliveConfig}; @@ -89,21 +89,67 @@ pub async fn connect_http_and_start( }; let hostport = format!("{}:{}", hostname, port); - let socket = socket_pool.new_external_tcp_v4(Some(TcpParams { - keepalive_enable: Some(true), - keepalive_cnt: Some(TCP_KEEPALIVE_COUNT), - keepalive_idle: Some(TCP_KEEPALIVE_IDLE), - keepalive_intvl: Some(TCP_KEEPALIVE_INTERVAL), - nodelay_enable: Some(true), - user_timeout: Some(TCP_USER_TIMEOUT), - buf_size: SocketBufSizes { - tx_buf_size: Some(SOCK_BUF_SZ), - rx_buf_size: Some(SOCK_BUF_SZ), - }, - }))?; + // First try to use derp v2, with poll keepalives it the feature is enabled + if derp_config.server_keepalives.poll_keepalive.is_some() { + telio_log_debug!("Trying to connect to derp V2"); + let socket = socket_pool.new_external_tcp_v4(Some(build_tcp_parameters(false)))?; + let stream = timeout(derp_config.timeout, socket.connect(ip)).await??; + let addr = PairAddr { + local: stream.local_addr()?, + remote: stream.peer_addr()?, + }; - let stream = timeout(derp_config.timeout, socket.connect(ip)).await??; + let connection = match u.scheme() { + "http" => { + Box::pin(connect_and_start( + stream, + addr, + derp_config.secret_key, + derp_config.server_keepalives.clone(), + &hostport, + true, + )) + .await + } + _ => { + let config = if derp_config.use_built_in_root_certificates { + let root_store: RootCertStore = TLS_SERVER_ROOTS.iter().cloned().collect(); + + let config = ClientConfig::builder() + .with_root_certificates(root_store) + .with_no_client_auth(); + + TlsConnector::from(Arc::new(config)) + } else { + TlsConnector::from(Arc::new(rustls_platform_verifier::tls_config())) + }; + + let server_name = + ServerName::try_from(hostname.clone()).map_err(|_| Error::InvalidServerName)?; + + connect_and_start( + config.connect(server_name, stream).await?, + addr, + derp_config.secret_key, + derp_config.server_keepalives.clone(), + &hostport, + true, + ) + .await + } + }; + + if connection.is_ok() { + // We managed to successfully connect to derp v2 + telio_log_debug!("Successfully connected to derp v2"); + return connection; + } + } + // If we didn't want or didn't manage to connect to depr v2, we will try derp v1 + telio_log_debug!("Trying to connect to derp v1"); + let socket = socket_pool.new_external_tcp_v4(Some(build_tcp_parameters(true)))?; + let stream = timeout(derp_config.timeout, socket.connect(ip)).await??; let addr = PairAddr { local: stream.local_addr()?, remote: stream.peer_addr()?, @@ -117,6 +163,7 @@ pub async fn connect_http_and_start( derp_config.secret_key, derp_config.server_keepalives, &hostport, + false, )) .await } @@ -142,6 +189,7 @@ pub async fn connect_http_and_start( derp_config.secret_key, derp_config.server_keepalives, &hostport, + false, ) .await } @@ -154,6 +202,7 @@ async fn connect_and_start( secret_key: SecretKey, server_keepalives: DerpKeepaliveConfig, host: &str, + derp_v2: bool, ) -> Result { let (mut reader, mut writer) = split(stream); @@ -162,6 +211,7 @@ async fn connect_and_start( &mut writer, &server_keepalives, host, + derp_v2, )) .await?; @@ -181,6 +231,18 @@ async fn connect_and_start( let sender_direct = conn_side_direct.tx; let receiver_direct = conn_side_direct.rx; + // TODO: When we switch to batched Poll Keepalives, we have to move this into the session keeper + // The logic is here for now for testing purposes + let poll_interval = match (derp_v2, server_keepalives.poll_keepalive) { + // We connected to derp V2, so we will use poll keepalives + (true, Some(poll_keepalive)) => Duration::from_secs(poll_keepalive as u64), + // We connected to derp V1, so we will use the old derp_keepalive config + (false, _) => Duration::from_secs(server_keepalives.derp_keepalive as u64), + // Any other situation is an error + _ => return Err(Error::InternalError), + }; + telio_log_debug!("Derp poll interval {:?}", poll_interval); + Ok(DerpConnection { comms_relayed: comm_side_relayed, comms_direct: comm_side_direct, @@ -190,10 +252,7 @@ async fn connect_and_start( join_receiver: tokio::spawn(async move { start_write(writer, receiver_relayed, receiver_direct, addr).await }), - poll_timer: { - let poll_interval = Duration::from_secs(server_keepalives.derp_keepalive as u64); - interval_at(tokio::time::Instant::now() + poll_interval, poll_interval) - }, + poll_timer: { interval_at(tokio::time::Instant::now() + poll_interval, poll_interval) }, }) } @@ -204,24 +263,39 @@ async fn connect_http( writer: &mut W, server_keepalives: &DerpKeepaliveConfig, host: &str, + derp_v2: bool, ) -> Result, Error> { - writer - .write_all( - format!( - "GET /derp HTTP/1.1\r\n\ + let request = match (derp_v2, server_keepalives.poll_keepalive) { + // We try to connect to derp V2 + (true, Some(poll_keepalive)) => format!( + "GET /derpv2 HTTP/1.1\r\n\ Host: {host}\r\n\ Connection: Upgrade\r\n\ Upgrade: WebSocket\r\n\ User-Agent: telio/{} {}\r\n\ - Keep-Alive: tcp={}, derp={}\r\n\r\n", - telio_utils::version_tag(), - std::env::consts::OS, - server_keepalives.tcp_keepalive, - server_keepalives.derp_keepalive, - ) - .as_bytes(), - ) - .await?; + Poll-Keepalive: {}\r\n\r\n", + telio_utils::version_tag(), + std::env::consts::OS, + poll_keepalive, + ), + // We try to connect to derp V1 + (false, _) => format!( + "GET /derp HTTP/1.1\r\n\ + Host: {host}\r\n\ + Connection: Upgrade\r\n\ + Upgrade: WebSocket\r\n\ + User-Agent: telio/{} {}\r\n\ + Keep-Alive: tcp={}, derp={}\r\n\r\n", + telio_utils::version_tag(), + std::env::consts::OS, + server_keepalives.tcp_keepalive, + server_keepalives.derp_keepalive, + ), + _ => return Err(Error::InternalError), + }; + telio_log_debug!("DERP connect request: {}", &request); + + writer.write_all(request.as_bytes()).await?; let mut data = [0_u8; MAX_TCP_PACKET_SIZE]; let data_len = reader.read(&mut data).await?; @@ -240,6 +314,33 @@ async fn connect_http( .to_vec()) } +fn build_tcp_parameters(use_tcp_keepalives: bool) -> TcpParams { + if use_tcp_keepalives { + TcpParams { + keepalive_enable: Some(true), + keepalive_cnt: Some(TCP_KEEPALIVE_COUNT), + keepalive_idle: Some(TCP_KEEPALIVE_IDLE), + keepalive_intvl: Some(TCP_KEEPALIVE_INTERVAL), + nodelay_enable: Some(true), + user_timeout: Some(TCP_USER_TIMEOUT), + buf_size: SocketBufSizes { + tx_buf_size: Some(SOCK_BUF_SZ), + rx_buf_size: Some(SOCK_BUF_SZ), + }, + } + } else { + TcpParams { + nodelay_enable: Some(true), + user_timeout: Some(TCP_USER_TIMEOUT), + buf_size: SocketBufSizes { + tx_buf_size: Some(SOCK_BUF_SZ), + rx_buf_size: Some(SOCK_BUF_SZ), + }, + ..Default::default() + } + } +} + #[cfg(test)] mod tests { use super::*; @@ -297,10 +398,11 @@ mod tests { let derp_config = DerpKeepaliveConfig { tcp_keepalive: 1, derp_keepalive: 2, + poll_keepalive: Some(3), }; assert_eq!( RESPONSE_BODY.as_bytes(), - connect_http(&mut r, &mut w, &derp_config, HOST) + connect_http(&mut r, &mut w, &derp_config, HOST, false) .await .unwrap() .as_slice() diff --git a/crates/telio-relay/src/derp/proto.rs b/crates/telio-relay/src/derp/proto.rs index 4daf69ad8..c2268ba48 100644 --- a/crates/telio-relay/src/derp/proto.rs +++ b/crates/telio-relay/src/derp/proto.rs @@ -156,6 +156,9 @@ pub enum Error { /// Url parse error #[error("Url parse error: {0}")] UrlParseError(#[from] url::ParseError), + /// Internal error + #[error("Internal error")] + InternalError, } impl From for RelayConnectionChangeReason { diff --git a/nat-lab/tests/test_derp_connect.py b/nat-lab/tests/test_derp_connect.py index e1603e518..73bddfb3d 100644 --- a/nat-lab/tests/test_derp_connect.py +++ b/nat-lab/tests/test_derp_connect.py @@ -1,12 +1,13 @@ import asyncio import os import pytest +import time from config import DERP_PRIMARY, DERP_SECONDARY, DERP_TERTIARY, DERP_SERVERS from contextlib import AsyncExitStack from copy import deepcopy from helpers import SetupParameters, setup_mesh_nodes from typing import List -from utils.bindings import RelayState +from utils.bindings import RelayState, FeatureDerp, default_features from utils.connection_util import ConnectionTag from utils.ping import ping @@ -15,14 +16,40 @@ DERP3_IP = str(DERP_TERTIARY.ipv4) +def _build_parameters( + connection_tag: List[ConnectionTag], enable_poll: bool +) -> List[SetupParameters]: + features = default_features() + if enable_poll: + assert features + features.derp = FeatureDerp( + tcp_keepalive=120, + derp_keepalive=120, + poll_keepalive=15, + enable_polling=False, + use_built_in_root_certificates=False, + ) + return [ + SetupParameters( + connection_tag=connection_tag, + features=features, + ) + for connection_tag in connection_tag + ] + + @pytest.mark.asyncio @pytest.mark.parametrize( "setup_params", [ - [ - SetupParameters(connection_tag=ConnectionTag.DOCKER_CONE_CLIENT_1), - SetupParameters(connection_tag=ConnectionTag.DOCKER_CONE_CLIENT_2), - ], + _build_parameters( + [ConnectionTag.DOCKER_CONE_CLIENT_1, ConnectionTag.DOCKER_CONE_CLIENT_2], + False, + ), + _build_parameters( + [ConnectionTag.DOCKER_CONE_CLIENT_1, ConnectionTag.DOCKER_CONE_CLIENT_2], + True, + ), ], ) async def test_derp_reconnect_2clients(setup_params: List[SetupParameters]) -> None: @@ -82,11 +109,22 @@ async def test_derp_reconnect_2clients(setup_params: List[SetupParameters]) -> N @pytest.mark.parametrize( "setup_params", [ - [ - SetupParameters(connection_tag=ConnectionTag.DOCKER_CONE_CLIENT_1), - SetupParameters(connection_tag=ConnectionTag.DOCKER_CONE_CLIENT_2), - SetupParameters(connection_tag=ConnectionTag.DOCKER_SYMMETRIC_CLIENT_1), - ], + _build_parameters( + [ + ConnectionTag.DOCKER_CONE_CLIENT_1, + ConnectionTag.DOCKER_CONE_CLIENT_2, + ConnectionTag.DOCKER_SYMMETRIC_CLIENT_1, + ], + False, + ), + _build_parameters( + [ + ConnectionTag.DOCKER_CONE_CLIENT_1, + ConnectionTag.DOCKER_CONE_CLIENT_2, + ConnectionTag.DOCKER_SYMMETRIC_CLIENT_1, + ], + True, + ), ], ) async def test_derp_reconnect_3clients(setup_params: List[SetupParameters]) -> None: @@ -223,11 +261,22 @@ async def test_derp_reconnect_3clients(setup_params: List[SetupParameters]) -> N @pytest.mark.parametrize( "setup_params", [ - [ - SetupParameters(connection_tag=ConnectionTag.DOCKER_CONE_CLIENT_1), - SetupParameters(connection_tag=ConnectionTag.DOCKER_CONE_CLIENT_2), - SetupParameters(connection_tag=ConnectionTag.DOCKER_SYMMETRIC_CLIENT_1), - ], + _build_parameters( + [ + ConnectionTag.DOCKER_CONE_CLIENT_1, + ConnectionTag.DOCKER_CONE_CLIENT_2, + ConnectionTag.DOCKER_SYMMETRIC_CLIENT_1, + ], + False, + ), + _build_parameters( + [ + ConnectionTag.DOCKER_CONE_CLIENT_1, + ConnectionTag.DOCKER_CONE_CLIENT_2, + ConnectionTag.DOCKER_SYMMETRIC_CLIENT_1, + ], + True, + ), ], ) async def test_derp_restart(setup_params: List[SetupParameters]) -> None: @@ -440,3 +489,31 @@ async def test_derp_server_list_exhaustion(setup_params: List[SetupParameters]) # Ping peer to check if connection truly works await ping(alpha_connection, beta.ip_addresses[0]) + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + "setup_params", + [ + _build_parameters([ConnectionTag.DOCKER_CONE_CLIENT_1], False), + _build_parameters([ConnectionTag.DOCKER_CONE_CLIENT_1], True), + ], +) +async def test_derp_reconnect_1client(setup_params: List[SetupParameters]) -> None: + async with AsyncExitStack() as exit_stack: + env = await setup_mesh_nodes(exit_stack, setup_params) + alpha_client = env.clients[0] + assert alpha_client + + # an iptables rule is placed in order to reject connections and + # send a TCP reset to the client BETA + await exit_stack.enter_async_context( + alpha_client.get_router().break_tcp_conn_to_host(DERP1_IP) + ) + + # Wait till connection is broken + await alpha_client.wait_for_state_derp( + DERP1_IP, [RelayState.DISCONNECTED, RelayState.CONNECTING] + ) + + await alpha_client.wait_for_state_derp(DERP2_IP, [RelayState.CONNECTED]) diff --git a/src/libtelio.udl b/src/libtelio.udl index b908dbc98..ec1008c4e 100644 --- a/src/libtelio.udl +++ b/src/libtelio.udl @@ -678,6 +678,8 @@ dictionary FeatureDerp { u32? tcp_keepalive; /// Derp will send empty messages after this many seconds of not sending/receiving any data [default 60s] u32? derp_keepalive; + /// Poll Keepalive: Application level keepalives meant to replace the TCP keepalives + u32? poll_keepalive; /// Enable polling of remote peer states to reduce derp traffic boolean? enable_polling; /// Use Mozilla's root certificates instead of OS ones [default false]