From e967761078bdfebb96ec71df9a3e7b347789b4da Mon Sep 17 00:00:00 2001 From: Daniel Fetti Date: Sun, 17 Nov 2024 18:54:58 +0200 Subject: [PATCH] Add DERP userspace keepalives More info on that in RFC-LLT-0070 Signed-off-by: Daniel Fetti --- .unreleased/LLT-5639 | 1 + crates/telio-model/src/features.rs | 6 + crates/telio-relay/src/derp.rs | 11 +- crates/telio-relay/src/derp/http.rs | 305 ++++++++++++++++++++++------ nat-lab/tests/test_derp_connect.py | 106 ++++++++-- src/libtelio.udl | 3 + 6 files changed, 356 insertions(+), 76 deletions(-) create mode 100644 .unreleased/LLT-5639 diff --git a/.unreleased/LLT-5639 b/.unreleased/LLT-5639 new file mode 100644 index 000000000..79f296022 --- /dev/null +++ b/.unreleased/LLT-5639 @@ -0,0 +1 @@ +Add DERP userspace keepalives \ No newline at end of file diff --git a/crates/telio-model/src/features.rs b/crates/telio-model/src/features.rs index 683262a91..707444a08 100644 --- a/crates/telio-model/src/features.rs +++ b/crates/telio-model/src/features.rs @@ -327,6 +327,10 @@ pub struct FeatureDerp { pub tcp_keepalive: Option, /// Derp will send empty messages after this many seconds of not sending/receiving any data [default 60s] pub derp_keepalive: Option, + /// Poll Keepalive: Application level keepalives meant to replace the TCP keepalives + /// They will use derp_keepalive as interval + #[serde(default)] + pub poll_keepalive: bool, /// Enable polling of remote peer states to reduce derp traffic pub enable_polling: Option, /// Use Mozilla's root certificates instead of OS ones [default false] @@ -536,6 +540,7 @@ mod tests { "derp": { "tcp_keepalive": 13, "derp_keepalive": 14, + "poll_keepalive": true, "enable_polling": true, "use_built_in_root_certificates": true }, @@ -626,6 +631,7 @@ mod tests { derp: Some(FeatureDerp { tcp_keepalive: Some(13), derp_keepalive: Some(14), + poll_keepalive: true, enable_polling: Some(true), use_built_in_root_certificates: true, }), diff --git a/crates/telio-relay/src/derp.rs b/crates/telio-relay/src/derp.rs index e7e2dd0cc..4affeed57 100644 --- a/crates/telio-relay/src/derp.rs +++ b/crates/telio-relay/src/derp.rs @@ -127,16 +127,21 @@ struct State { /// Keepalive values that help keeping Derp connection in conntrack alive, /// so server can send traffic after being silent for a while /// *derp_keepalive* is also used as an interval for retrieving remote peer states. +/// PollKeepalive is a feature that is meant to replace TCP keepalives with +/// application level keepalives (DerpPollRequests). +/// More info on PollKeepalive can be found in RFC-LLT-0070 #[derive(Debug, Clone, PartialEq, Eq)] pub struct DerpKeepaliveConfig { tcp_keepalive: u32, derp_keepalive: u32, + poll_keepalive: bool, } impl From<&Option> for DerpKeepaliveConfig { fn from(derp: &Option) -> Self { let mut tcp_keepalive = proto::DERP_TCP_KEEPALIVE_INTERVAL; let mut derp_keepalive = proto::DERP_KEEPALIVE_INTERVAL; + let mut poll_keepalive = false; if let Some(derp) = derp { if let Some(tcp_ka) = derp.tcp_keepalive { tcp_keepalive = tcp_ka; @@ -144,11 +149,13 @@ impl From<&Option> for DerpKeepaliveConfig { if let Some(derp_ka) = derp.derp_keepalive { derp_keepalive = derp_ka; } + poll_keepalive = derp.poll_keepalive; } DerpKeepaliveConfig { tcp_keepalive, derp_keepalive, + poll_keepalive, } } } @@ -156,6 +163,7 @@ impl From<&Option> for DerpKeepaliveConfig { const DEFAULT_SERVER_KEEPALIVE_CONFIG: DerpKeepaliveConfig = DerpKeepaliveConfig { tcp_keepalive: proto::DERP_TCP_KEEPALIVE_INTERVAL, derp_keepalive: proto::DERP_KEEPALIVE_INTERVAL, + poll_keepalive: false, }; /// Derp configuration @@ -726,8 +734,9 @@ impl Runtime for State { }, // On tick send derp poll request to derp stream Some((permit, _)) = wait_for_tx(&c.comms_direct.tx, poll_timer_tick) => { - if config.enable_polling { + if config.enable_polling || config.server_keepalives.poll_keepalive { self.derp_poll_session = self.derp_poll_session.wrapping_add(1); + telio_log_debug!("Sending DerpPollRequest with session {}", self.derp_poll_session); Self::handle_outcoming_payload_direct(permit, PacketControl::DerpPollRequest(DerpPollRequestMsg::new( self.derp_poll_session, &config.meshnet_peers ))).await; diff --git a/crates/telio-relay/src/derp/http.rs b/crates/telio-relay/src/derp/http.rs index fc32afc82..725de9024 100644 --- a/crates/telio-relay/src/derp/http.rs +++ b/crates/telio-relay/src/derp/http.rs @@ -14,7 +14,7 @@ use std::{ }; use telio_sockets::{SocketBufSizes, SocketPool, TcpParams}; use telio_task::io::Chan; -use telio_utils::interval_at; +use telio_utils::{interval_at, telio_log_debug}; use webpki_roots::TLS_SERVER_ROOTS; use crate::{Config, DerpKeepaliveConfig}; @@ -37,6 +37,11 @@ const SOCK_BUF_SZ: usize = 212992; /// Max TCP packet size is 65535 const MAX_TCP_PACKET_SIZE: usize = u16::MAX as usize; +enum DerpVersion { + DerpV1, + DerpV2, +} + /// Class used to manage connection and it's receive/send threads pub struct DerpConnection { /// Communication channel for Node <-> Node communication @@ -89,21 +94,67 @@ pub async fn connect_http_and_start( }; let hostport = format!("{}:{}", hostname, port); - let socket = socket_pool.new_external_tcp_v4(Some(TcpParams { - keepalive_enable: Some(true), - keepalive_cnt: Some(TCP_KEEPALIVE_COUNT), - keepalive_idle: Some(TCP_KEEPALIVE_IDLE), - keepalive_intvl: Some(TCP_KEEPALIVE_INTERVAL), - nodelay_enable: Some(true), - user_timeout: Some(TCP_USER_TIMEOUT), - buf_size: SocketBufSizes { - tx_buf_size: Some(SOCK_BUF_SZ), - rx_buf_size: Some(SOCK_BUF_SZ), - }, - }))?; + // First try to use derp v2, with poll keepalives it the feature is enabled + if derp_config.server_keepalives.poll_keepalive { + telio_log_debug!("Trying to connect to derp V2"); + let socket = socket_pool.new_external_tcp_v4(Some(build_tcp_parameters(false)))?; + let stream = timeout(derp_config.timeout, socket.connect(ip)).await??; + let addr = PairAddr { + local: stream.local_addr()?, + remote: stream.peer_addr()?, + }; - let stream = timeout(derp_config.timeout, socket.connect(ip)).await??; + let connection = match u.scheme() { + "http" => { + Box::pin(connect_and_start( + stream, + addr, + derp_config.secret_key.clone(), + derp_config.server_keepalives.clone(), + &hostport, + DerpVersion::DerpV2, + )) + .await + } + _ => { + let config = if derp_config.use_built_in_root_certificates { + let root_store: RootCertStore = TLS_SERVER_ROOTS.iter().cloned().collect(); + + let config = ClientConfig::builder() + .with_root_certificates(root_store) + .with_no_client_auth(); + + TlsConnector::from(Arc::new(config)) + } else { + TlsConnector::from(Arc::new(rustls_platform_verifier::tls_config())) + }; + + let server_name = + ServerName::try_from(hostname.clone()).map_err(|_| Error::InvalidServerName)?; + + connect_and_start( + config.connect(server_name, stream).await?, + addr, + derp_config.secret_key.clone(), + derp_config.server_keepalives.clone(), + &hostport, + DerpVersion::DerpV2, + ) + .await + } + }; + + if connection.is_ok() { + // We managed to successfully connect to derp v2 + telio_log_debug!("Successfully connected to derp v2"); + return connection; + } + } + // If we didn't want or didn't manage to connect to depr v2, we will try derp v1 + telio_log_debug!("Trying to connect to derp v1"); + let socket = socket_pool.new_external_tcp_v4(Some(build_tcp_parameters(true)))?; + let stream = timeout(derp_config.timeout, socket.connect(ip)).await??; let addr = PairAddr { local: stream.local_addr()?, remote: stream.peer_addr()?, @@ -114,9 +165,10 @@ pub async fn connect_http_and_start( Box::pin(connect_and_start( stream, addr, - derp_config.secret_key, + derp_config.secret_key.clone(), derp_config.server_keepalives, &hostport, + DerpVersion::DerpV1, )) .await } @@ -142,6 +194,7 @@ pub async fn connect_http_and_start( derp_config.secret_key, derp_config.server_keepalives, &hostport, + DerpVersion::DerpV1, ) .await } @@ -154,6 +207,7 @@ async fn connect_and_start( secret_key: SecretKey, server_keepalives: DerpKeepaliveConfig, host: &str, + derp_version: DerpVersion, ) -> Result { let (mut reader, mut writer) = split(stream); @@ -162,6 +216,7 @@ async fn connect_and_start( &mut writer, &server_keepalives, host, + derp_version, )) .await?; @@ -181,6 +236,10 @@ async fn connect_and_start( let sender_direct = conn_side_direct.tx; let receiver_direct = conn_side_direct.rx; + // TODO (LLT-5871): When we switch to batched Poll Keepalives, we have to move this into the session keeper + let poll_interval = Duration::from_secs(server_keepalives.derp_keepalive as u64); + telio_log_debug!("Derp poll interval {:?}", poll_interval); + Ok(DerpConnection { comms_relayed: comm_side_relayed, comms_direct: comm_side_direct, @@ -190,10 +249,7 @@ async fn connect_and_start( join_receiver: tokio::spawn(async move { start_write(writer, receiver_relayed, receiver_direct, addr).await }), - poll_timer: { - let poll_interval = Duration::from_secs(server_keepalives.derp_keepalive as u64); - interval_at(tokio::time::Instant::now() + poll_interval, poll_interval) - }, + poll_timer: { interval_at(tokio::time::Instant::now() + poll_interval, poll_interval) }, }) } @@ -204,24 +260,38 @@ async fn connect_http( writer: &mut W, server_keepalives: &DerpKeepaliveConfig, host: &str, + derp_version: DerpVersion, ) -> Result, Error> { - writer - .write_all( - format!( - "GET /derp HTTP/1.1\r\n\ + let request = match derp_version { + // We try to connect to derp V2 + DerpVersion::DerpV2 => format!( + "GET /v2/derp HTTP/1.1\r\n\ Host: {host}\r\n\ Connection: Upgrade\r\n\ Upgrade: WebSocket\r\n\ User-Agent: telio/{} {}\r\n\ - Keep-Alive: tcp={}, derp={}\r\n\r\n", - telio_utils::version_tag(), - std::env::consts::OS, - server_keepalives.tcp_keepalive, - server_keepalives.derp_keepalive, - ) - .as_bytes(), - ) - .await?; + Poll-Keepalive: {}\r\n\r\n", + telio_utils::version_tag(), + std::env::consts::OS, + server_keepalives.derp_keepalive, + ), + // We try to connect to derp V1 + DerpVersion::DerpV1 => format!( + "GET /derp HTTP/1.1\r\n\ + Host: {host}\r\n\ + Connection: Upgrade\r\n\ + Upgrade: WebSocket\r\n\ + User-Agent: telio/{} {}\r\n\ + Keep-Alive: tcp={}, derp={}\r\n\r\n", + telio_utils::version_tag(), + std::env::consts::OS, + server_keepalives.tcp_keepalive, + server_keepalives.derp_keepalive, + ), + }; + telio_log_debug!("DERP connect request: {}", &request); + + writer.write_all(request.as_bytes()).await?; let mut data = [0_u8; MAX_TCP_PACKET_SIZE]; let data_len = reader.read(&mut data).await?; @@ -240,51 +310,158 @@ async fn connect_http( .to_vec()) } +fn build_tcp_parameters(use_tcp_keepalives: bool) -> TcpParams { + if use_tcp_keepalives { + TcpParams { + keepalive_enable: Some(true), + keepalive_cnt: Some(TCP_KEEPALIVE_COUNT), + keepalive_idle: Some(TCP_KEEPALIVE_IDLE), + keepalive_intvl: Some(TCP_KEEPALIVE_INTERVAL), + nodelay_enable: Some(true), + user_timeout: Some(TCP_USER_TIMEOUT), + buf_size: SocketBufSizes { + tx_buf_size: Some(SOCK_BUF_SZ), + rx_buf_size: Some(SOCK_BUF_SZ), + }, + } + } else { + TcpParams { + nodelay_enable: Some(true), + user_timeout: Some(TCP_USER_TIMEOUT), + buf_size: SocketBufSizes { + tx_buf_size: Some(SOCK_BUF_SZ), + rx_buf_size: Some(SOCK_BUF_SZ), + }, + ..Default::default() + } + } +} + #[cfg(test)] mod tests { use super::*; use hyper::http::HeaderValue; use hyper::server::conn::http1; use hyper::service::service_fn; - use hyper::{Request, Response}; + use hyper::{Body, Request, Response}; use tokio::net::{TcpListener, TcpStream}; - #[tokio::test] - async fn http_derp_connection_initiation() { - const RESPONSE_BODY: &str = "test body"; - const HOST: &str = "hostname"; - - async fn hello(request: Request) -> hyper::Result> { - assert_eq!( - request.headers().get(hyper::header::HOST), - Some(&HeaderValue::from_static(HOST)) - ); - assert_eq!( - request.headers().get(hyper::header::CONNECTION), - Some(&HeaderValue::from_static("Upgrade")) - ); - assert_eq!( - request.headers().get(hyper::header::UPGRADE), - Some(&HeaderValue::from_static("WebSocket")) - ); - assert_eq!( - request.headers().get("keep-alive"), - Some(&HeaderValue::from_static("tcp=1, derp=2")) - ); - Ok(Response::builder() - .body(hyper::Body::from(RESPONSE_BODY)) - .unwrap()) + const RESPONSE_BODY: &str = "test body"; + const HOST: &str = "hostname"; + + fn default_asserts(request: &Request) -> () { + assert_eq!( + request.headers().get(hyper::header::HOST), + Some(&HeaderValue::from_static(HOST)) + ); + assert_eq!( + request.headers().get(hyper::header::CONNECTION), + Some(&HeaderValue::from_static("Upgrade")) + ); + assert_eq!( + request.headers().get(hyper::header::UPGRADE), + Some(&HeaderValue::from_static("WebSocket")) + ); + } + + async fn derpv1_handler(request: Request) -> hyper::Result> { + default_asserts(&request); + match request.uri().path() { + "/derp" => { + assert_eq!( + request.headers().get("Keep-alive"), + Some(&HeaderValue::from_static("tcp=1, derp=2")) + ); + assert_eq!(request.headers().get("Poll-keepalive"), None); + Ok(Response::builder().body(Body::from(RESPONSE_BODY)).unwrap()) + } + _ => Ok(Response::builder() + .status(404) + .body(Body::from("Not found")) + .unwrap()), + } + } + + async fn derpv2_handler(request: Request) -> hyper::Result> { + default_asserts(&request); + match request.uri().path() { + "/derp" => { + assert_eq!( + request.headers().get("Keep-alive"), + Some(&HeaderValue::from_static("tcp=1, derp=2")) + ); + assert_eq!(request.headers().get("Poll-keepalive"), None); + Ok(Response::builder().body(Body::from(RESPONSE_BODY)).unwrap()) + } + "/v2/derp" => { + assert_eq!(request.headers().get("Keep-alive"), None); + assert_eq!( + request.headers().get("Poll-keepalive"), + Some(&HeaderValue::from_static("2")) + ); + Ok(Response::builder().body(Body::from(RESPONSE_BODY)).unwrap()) + } + _ => Ok(Response::builder() + .status(404) + .body(Body::from("Not found")) + .unwrap()), } + } + + #[tokio::test] + async fn test_v1_derp_connection() { let addr = std::net::SocketAddr::from(([127, 0, 0, 1], 0)); let listener = TcpListener::bind(addr).await.unwrap(); + let connect_addr = listener.local_addr().unwrap(); + + tokio::spawn(async move { + loop { + let (stream, _) = listener.accept().await.unwrap(); + tokio::task::spawn(async move { + http1::Builder::new() + .serve_connection(stream, service_fn(derpv1_handler)) + .await + .unwrap(); + }); + } + }); + + let mut stream = TcpStream::connect(connect_addr).await.unwrap(); + let (mut r, mut w) = stream.split(); + let derp_config = DerpKeepaliveConfig { + tcp_keepalive: 1, + derp_keepalive: 2, + poll_keepalive: true, + }; + assert_eq!( + "Not found".as_bytes(), + connect_http(&mut r, &mut w, &derp_config, HOST, DerpVersion::DerpV2) + .await + .unwrap() + .as_slice() + ); + assert_eq!( + RESPONSE_BODY.as_bytes(), + connect_http(&mut r, &mut w, &derp_config, HOST, DerpVersion::DerpV1) + .await + .unwrap() + .as_slice() + ); + } + + #[tokio::test] + async fn test_v2_derp_connection() { + let addr = std::net::SocketAddr::from(([127, 0, 0, 1], 0)); + let listener = TcpListener::bind(addr).await.unwrap(); let connect_addr = listener.local_addr().unwrap(); + tokio::spawn(async move { loop { let (stream, _) = listener.accept().await.unwrap(); tokio::task::spawn(async move { http1::Builder::new() - .serve_connection(stream, service_fn(hello)) + .serve_connection(stream, service_fn(derpv2_handler)) .await .unwrap(); }); @@ -297,10 +474,18 @@ mod tests { let derp_config = DerpKeepaliveConfig { tcp_keepalive: 1, derp_keepalive: 2, + poll_keepalive: true, }; assert_eq!( RESPONSE_BODY.as_bytes(), - connect_http(&mut r, &mut w, &derp_config, HOST) + connect_http(&mut r, &mut w, &derp_config, HOST, DerpVersion::DerpV2) + .await + .unwrap() + .as_slice() + ); + assert_eq!( + RESPONSE_BODY.as_bytes(), + connect_http(&mut r, &mut w, &derp_config, HOST, DerpVersion::DerpV1) .await .unwrap() .as_slice() diff --git a/nat-lab/tests/test_derp_connect.py b/nat-lab/tests/test_derp_connect.py index e1603e518..b4b99269d 100644 --- a/nat-lab/tests/test_derp_connect.py +++ b/nat-lab/tests/test_derp_connect.py @@ -6,7 +6,7 @@ from copy import deepcopy from helpers import SetupParameters, setup_mesh_nodes from typing import List -from utils.bindings import RelayState +from utils.bindings import RelayState, FeatureDerp, default_features from utils.connection_util import ConnectionTag from utils.ping import ping @@ -15,14 +15,40 @@ DERP3_IP = str(DERP_TERTIARY.ipv4) +def _build_parameters( + connection_tag: List[ConnectionTag], enable_poll: bool +) -> List[SetupParameters]: + features = default_features() + if enable_poll: + assert features + features.derp = FeatureDerp( + tcp_keepalive=120, + derp_keepalive=15, + poll_keepalive=True, + enable_polling=False, + use_built_in_root_certificates=False, + ) + return [ + SetupParameters( + connection_tag=connection_tag, + features=features, + ) + for connection_tag in connection_tag + ] + + @pytest.mark.asyncio @pytest.mark.parametrize( "setup_params", [ - [ - SetupParameters(connection_tag=ConnectionTag.DOCKER_CONE_CLIENT_1), - SetupParameters(connection_tag=ConnectionTag.DOCKER_CONE_CLIENT_2), - ], + _build_parameters( + [ConnectionTag.DOCKER_CONE_CLIENT_1, ConnectionTag.DOCKER_CONE_CLIENT_2], + False, + ), + _build_parameters( + [ConnectionTag.DOCKER_CONE_CLIENT_1, ConnectionTag.DOCKER_CONE_CLIENT_2], + True, + ), ], ) async def test_derp_reconnect_2clients(setup_params: List[SetupParameters]) -> None: @@ -82,11 +108,22 @@ async def test_derp_reconnect_2clients(setup_params: List[SetupParameters]) -> N @pytest.mark.parametrize( "setup_params", [ - [ - SetupParameters(connection_tag=ConnectionTag.DOCKER_CONE_CLIENT_1), - SetupParameters(connection_tag=ConnectionTag.DOCKER_CONE_CLIENT_2), - SetupParameters(connection_tag=ConnectionTag.DOCKER_SYMMETRIC_CLIENT_1), - ], + _build_parameters( + [ + ConnectionTag.DOCKER_CONE_CLIENT_1, + ConnectionTag.DOCKER_CONE_CLIENT_2, + ConnectionTag.DOCKER_SYMMETRIC_CLIENT_1, + ], + False, + ), + _build_parameters( + [ + ConnectionTag.DOCKER_CONE_CLIENT_1, + ConnectionTag.DOCKER_CONE_CLIENT_2, + ConnectionTag.DOCKER_SYMMETRIC_CLIENT_1, + ], + True, + ), ], ) async def test_derp_reconnect_3clients(setup_params: List[SetupParameters]) -> None: @@ -223,11 +260,22 @@ async def test_derp_reconnect_3clients(setup_params: List[SetupParameters]) -> N @pytest.mark.parametrize( "setup_params", [ - [ - SetupParameters(connection_tag=ConnectionTag.DOCKER_CONE_CLIENT_1), - SetupParameters(connection_tag=ConnectionTag.DOCKER_CONE_CLIENT_2), - SetupParameters(connection_tag=ConnectionTag.DOCKER_SYMMETRIC_CLIENT_1), - ], + _build_parameters( + [ + ConnectionTag.DOCKER_CONE_CLIENT_1, + ConnectionTag.DOCKER_CONE_CLIENT_2, + ConnectionTag.DOCKER_SYMMETRIC_CLIENT_1, + ], + False, + ), + _build_parameters( + [ + ConnectionTag.DOCKER_CONE_CLIENT_1, + ConnectionTag.DOCKER_CONE_CLIENT_2, + ConnectionTag.DOCKER_SYMMETRIC_CLIENT_1, + ], + True, + ), ], ) async def test_derp_restart(setup_params: List[SetupParameters]) -> None: @@ -440,3 +488,31 @@ async def test_derp_server_list_exhaustion(setup_params: List[SetupParameters]) # Ping peer to check if connection truly works await ping(alpha_connection, beta.ip_addresses[0]) + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + "setup_params", + [ + _build_parameters([ConnectionTag.DOCKER_CONE_CLIENT_1], False), + _build_parameters([ConnectionTag.DOCKER_CONE_CLIENT_1], True), + ], +) +async def test_derp_reconnect_1client(setup_params: List[SetupParameters]) -> None: + async with AsyncExitStack() as exit_stack: + env = await setup_mesh_nodes(exit_stack, setup_params) + alpha_client = env.clients[0] + assert alpha_client + + # an iptables rule is placed in order to reject connections and + # send a TCP reset to the client BETA + await exit_stack.enter_async_context( + alpha_client.get_router().break_tcp_conn_to_host(DERP1_IP) + ) + + # Wait till connection is broken + await alpha_client.wait_for_state_derp( + DERP1_IP, [RelayState.DISCONNECTED, RelayState.CONNECTING] + ) + + await alpha_client.wait_for_state_derp(DERP2_IP, [RelayState.CONNECTED]) diff --git a/src/libtelio.udl b/src/libtelio.udl index 7dcf5a3b9..c5c12dd00 100644 --- a/src/libtelio.udl +++ b/src/libtelio.udl @@ -681,6 +681,9 @@ dictionary FeatureDerp { u32? tcp_keepalive; /// Derp will send empty messages after this many seconds of not sending/receiving any data [default 60s] u32? derp_keepalive; + /// Poll Keepalive: Application level keepalives meant to replace the TCP keepalives + /// They will reuse the derp_keepalive interval + boolean poll_keepalive; /// Enable polling of remote peer states to reduce derp traffic boolean? enable_polling; /// Use Mozilla's root certificates instead of OS ones [default false]