Skip to content

Commit

Permalink
Add DERP userspace keepalives
Browse files Browse the repository at this point in the history
Signed-off-by: Daniel Fetti <[email protected]>
  • Loading branch information
dfetti committed Nov 26, 2024
1 parent fd7f073 commit f1ff82b
Show file tree
Hide file tree
Showing 5 changed files with 151 additions and 33 deletions.
4 changes: 4 additions & 0 deletions crates/telio-model/src/features.rs
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,8 @@ pub struct FeatureDerp {
pub tcp_keepalive: Option<u32>,
/// Derp will send empty messages after this many seconds of not sending/receiving any data [default 60s]
pub derp_keepalive: Option<u32>,
/// Poll Keepalive: Application level keepalives meant to replace the TCP keepalives
pub poll_keepalive: Option<u32>,
/// Enable polling of remote peer states to reduce derp traffic
pub enable_polling: Option<bool>,
/// Use Mozilla's root certificates instead of OS ones [default false]
Expand Down Expand Up @@ -523,6 +525,7 @@ mod tests {
"derp": {
"tcp_keepalive": 13,
"derp_keepalive": 14,
"poll_keepalive": 15,
"enable_polling": true,
"use_built_in_root_certificates": true
},
Expand Down Expand Up @@ -610,6 +613,7 @@ mod tests {
derp: Some(FeatureDerp {
tcp_keepalive: Some(13),
derp_keepalive: Some(14),
poll_keepalive: Some(15),
enable_polling: Some(true),
use_built_in_root_certificates: true,
}),
Expand Down
9 changes: 8 additions & 1 deletion crates/telio-relay/src/derp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,35 +127,41 @@ struct State {
/// Keepalive values that help keeping Derp connection in conntrack alive,
/// so server can send traffic after being silent for a while
/// *derp_keepalive* is also used as an interval for retrieving remote peer states.
/// TODO: Update comments for Poll Keepalives
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DerpKeepaliveConfig {
tcp_keepalive: u32,
derp_keepalive: u32,
poll_keepalive: Option<u32>,
}

impl From<&Option<FeatureDerp>> for DerpKeepaliveConfig {
fn from(derp: &Option<FeatureDerp>) -> Self {
let mut tcp_keepalive = proto::DERP_TCP_KEEPALIVE_INTERVAL;
let mut derp_keepalive = proto::DERP_KEEPALIVE_INTERVAL;
let mut poll_keepalive = None;
if let Some(derp) = derp {
if let Some(tcp_ka) = derp.tcp_keepalive {
tcp_keepalive = tcp_ka;
}
if let Some(derp_ka) = derp.derp_keepalive {
derp_keepalive = derp_ka;
}
poll_keepalive = derp.poll_keepalive;
}

DerpKeepaliveConfig {
tcp_keepalive,
derp_keepalive,
poll_keepalive,
}
}
}

const DEFAULT_SERVER_KEEPALIVE_CONFIG: DerpKeepaliveConfig = DerpKeepaliveConfig {
tcp_keepalive: proto::DERP_TCP_KEEPALIVE_INTERVAL,
derp_keepalive: proto::DERP_KEEPALIVE_INTERVAL,
poll_keepalive: None,
};

/// Derp configuration
Expand Down Expand Up @@ -725,8 +731,9 @@ impl Runtime for State {
},
// On tick send derp poll request to derp stream
Some((permit, _)) = wait_for_tx(&c.comms_direct.tx, poll_timer_tick) => {
if config.enable_polling {
if config.enable_polling || config.server_keepalives.poll_keepalive.is_some() {
self.derp_poll_session = self.derp_poll_session.wrapping_add(1);
telio_log_debug!("Sending DerpPollRequest with session {}", self.derp_poll_session);
Self::handle_outcoming_payload_direct(permit, PacketControl::DerpPollRequest(DerpPollRequestMsg::new(
self.derp_poll_session, &config.meshnet_peers
))).await;
Expand Down
166 changes: 134 additions & 32 deletions crates/telio-relay/src/derp/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use std::{
};
use telio_sockets::{SocketBufSizes, SocketPool, TcpParams};
use telio_task::io::Chan;
use telio_utils::interval_at;
use telio_utils::{interval_at, telio_log_debug};
use webpki_roots::TLS_SERVER_ROOTS;

use crate::{Config, DerpKeepaliveConfig};
Expand Down Expand Up @@ -89,21 +89,67 @@ pub async fn connect_http_and_start(
};
let hostport = format!("{}:{}", hostname, port);

let socket = socket_pool.new_external_tcp_v4(Some(TcpParams {
keepalive_enable: Some(true),
keepalive_cnt: Some(TCP_KEEPALIVE_COUNT),
keepalive_idle: Some(TCP_KEEPALIVE_IDLE),
keepalive_intvl: Some(TCP_KEEPALIVE_INTERVAL),
nodelay_enable: Some(true),
user_timeout: Some(TCP_USER_TIMEOUT),
buf_size: SocketBufSizes {
tx_buf_size: Some(SOCK_BUF_SZ),
rx_buf_size: Some(SOCK_BUF_SZ),
},
}))?;
// First try to use derp v2, with poll keepalives it the feature is enabled
if derp_config.server_keepalives.poll_keepalive.is_some() {
telio_log_debug!("Trying to connect to derp V2");
let socket = socket_pool.new_external_tcp_v4(Some(build_tcp_parameters(false)))?;
let stream = timeout(derp_config.timeout, socket.connect(ip)).await??;
let addr = PairAddr {
local: stream.local_addr()?,
remote: stream.peer_addr()?,
};

let stream = timeout(derp_config.timeout, socket.connect(ip)).await??;
let connection = match u.scheme() {
"http" => {
Box::pin(connect_and_start(
stream,
addr,
derp_config.secret_key,
derp_config.server_keepalives.clone(),
&hostport,
true,
))
.await
}
_ => {
let config = if derp_config.use_built_in_root_certificates {
let root_store: RootCertStore = TLS_SERVER_ROOTS.iter().cloned().collect();

let config = ClientConfig::builder()
.with_root_certificates(root_store)
.with_no_client_auth();

TlsConnector::from(Arc::new(config))
} else {
TlsConnector::from(Arc::new(rustls_platform_verifier::tls_config()))
};

let server_name =
ServerName::try_from(hostname.clone()).map_err(|_| Error::InvalidServerName)?;

connect_and_start(
config.connect(server_name, stream).await?,
addr,
derp_config.secret_key,
derp_config.server_keepalives.clone(),
&hostport,
true,
)
.await
}
};

if connection.is_ok() {
// We managed to successfully connect to derp v2
telio_log_debug!("Successfully connected to derp v2");
return connection;
}
}

// If we didn't want or didn't manage to connect to depr v2, we will try derp v1
telio_log_debug!("Trying to connect to derp v1");
let socket = socket_pool.new_external_tcp_v4(Some(build_tcp_parameters(true)))?;
let stream = timeout(derp_config.timeout, socket.connect(ip)).await??;
let addr = PairAddr {
local: stream.local_addr()?,
remote: stream.peer_addr()?,
Expand All @@ -117,6 +163,7 @@ pub async fn connect_http_and_start(
derp_config.secret_key,
derp_config.server_keepalives,
&hostport,
false,
))
.await
}
Expand All @@ -142,6 +189,7 @@ pub async fn connect_http_and_start(
derp_config.secret_key,
derp_config.server_keepalives,
&hostport,
false,
)
.await
}
Expand All @@ -154,6 +202,7 @@ async fn connect_and_start<RW: AsyncRead + AsyncWrite + Send + 'static>(
secret_key: SecretKey,
server_keepalives: DerpKeepaliveConfig,
host: &str,
derp_v2: bool,
) -> Result<DerpConnection, Error> {
let (mut reader, mut writer) = split(stream);

Expand All @@ -162,6 +211,7 @@ async fn connect_and_start<RW: AsyncRead + AsyncWrite + Send + 'static>(
&mut writer,
&server_keepalives,
host,
derp_v2,
))
.await?;

Expand All @@ -181,6 +231,18 @@ async fn connect_and_start<RW: AsyncRead + AsyncWrite + Send + 'static>(
let sender_direct = conn_side_direct.tx;
let receiver_direct = conn_side_direct.rx;

// TODO: When we switch to batched Poll Keepalives, we have to move this into the session keeper
// The logic is here for now for testing purposes
let poll_interval = match (derp_v2, server_keepalives.poll_keepalive) {
// We connected to derp V2, so we will use poll keepalives
(true, Some(poll_keepalive)) => Duration::from_secs(poll_keepalive as u64),
// We connected to derp V1, so we will use the old derp_keepalive config
(false, _) => Duration::from_secs(server_keepalives.derp_keepalive as u64),
// Any other situation is an error
_ => return Err(Error::InternalError),
};
telio_log_debug!("Derp poll interval {:?}", poll_interval);

Ok(DerpConnection {
comms_relayed: comm_side_relayed,
comms_direct: comm_side_direct,
Expand All @@ -190,10 +252,7 @@ async fn connect_and_start<RW: AsyncRead + AsyncWrite + Send + 'static>(
join_receiver: tokio::spawn(async move {
start_write(writer, receiver_relayed, receiver_direct, addr).await
}),
poll_timer: {
let poll_interval = Duration::from_secs(server_keepalives.derp_keepalive as u64);
interval_at(tokio::time::Instant::now() + poll_interval, poll_interval)
},
poll_timer: { interval_at(tokio::time::Instant::now() + poll_interval, poll_interval) },
})
}

Expand All @@ -204,24 +263,39 @@ async fn connect_http<R: AsyncRead + Unpin, W: AsyncWrite + Unpin>(
writer: &mut W,
server_keepalives: &DerpKeepaliveConfig,
host: &str,
derp_v2: bool,
) -> Result<Vec<u8>, Error> {
writer
.write_all(
format!(
"GET /derp HTTP/1.1\r\n\
let request = match (derp_v2, server_keepalives.poll_keepalive) {
// We try to connect to derp V2
(true, Some(poll_keepalive)) => format!(
"GET /derpv2 HTTP/1.1\r\n\
Host: {host}\r\n\
Connection: Upgrade\r\n\
Upgrade: WebSocket\r\n\
User-Agent: telio/{} {}\r\n\
Keep-Alive: tcp={}, derp={}\r\n\r\n",
telio_utils::version_tag(),
std::env::consts::OS,
server_keepalives.tcp_keepalive,
server_keepalives.derp_keepalive,
)
.as_bytes(),
)
.await?;
Poll-Keepalive: {}\r\n\r\n",
telio_utils::version_tag(),
std::env::consts::OS,
poll_keepalive,
),
// We try to connect to derp V1
(false, _) => format!(
"GET /derp HTTP/1.1\r\n\
Host: {host}\r\n\
Connection: Upgrade\r\n\
Upgrade: WebSocket\r\n\
User-Agent: telio/{} {}\r\n\
Keep-Alive: tcp={}, derp={}\r\n\r\n",
telio_utils::version_tag(),
std::env::consts::OS,
server_keepalives.tcp_keepalive,
server_keepalives.derp_keepalive,
),
_ => return Err(Error::InternalError),
};
telio_log_debug!("DERP connect request: {}", &request);

writer.write_all(request.as_bytes()).await?;

let mut data = [0_u8; MAX_TCP_PACKET_SIZE];
let data_len = reader.read(&mut data).await?;
Expand All @@ -240,6 +314,33 @@ async fn connect_http<R: AsyncRead + Unpin, W: AsyncWrite + Unpin>(
.to_vec())
}

fn build_tcp_parameters(use_tcp_keepalives: bool) -> TcpParams {
if use_tcp_keepalives {
TcpParams {
keepalive_enable: Some(true),
keepalive_cnt: Some(TCP_KEEPALIVE_COUNT),
keepalive_idle: Some(TCP_KEEPALIVE_IDLE),
keepalive_intvl: Some(TCP_KEEPALIVE_INTERVAL),
nodelay_enable: Some(true),
user_timeout: Some(TCP_USER_TIMEOUT),
buf_size: SocketBufSizes {
tx_buf_size: Some(SOCK_BUF_SZ),
rx_buf_size: Some(SOCK_BUF_SZ),
},
}
} else {
TcpParams {
nodelay_enable: Some(true),
user_timeout: Some(TCP_USER_TIMEOUT),
buf_size: SocketBufSizes {
tx_buf_size: Some(SOCK_BUF_SZ),
rx_buf_size: Some(SOCK_BUF_SZ),
},
..Default::default()
}
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -297,10 +398,11 @@ mod tests {
let derp_config = DerpKeepaliveConfig {
tcp_keepalive: 1,
derp_keepalive: 2,
poll_keepalive: Some(3),
};
assert_eq!(
RESPONSE_BODY.as_bytes(),
connect_http(&mut r, &mut w, &derp_config, HOST)
connect_http(&mut r, &mut w, &derp_config, HOST, false)
.await
.unwrap()
.as_slice()
Expand Down
3 changes: 3 additions & 0 deletions crates/telio-relay/src/derp/proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,9 @@ pub enum Error {
/// Url parse error
#[error("Url parse error: {0}")]
UrlParseError(#[from] url::ParseError),
/// Internal error
#[error("Internal error")]
InternalError,
}

impl From<Error> for RelayConnectionChangeReason {
Expand Down
2 changes: 2 additions & 0 deletions src/libtelio.udl
Original file line number Diff line number Diff line change
Expand Up @@ -672,6 +672,8 @@ dictionary FeatureDerp {
u32? tcp_keepalive;
/// Derp will send empty messages after this many seconds of not sending/receiving any data [default 60s]
u32? derp_keepalive;
/// Poll Keepalive: Application level keepalives meant to replace the TCP keepalives
u32? poll_keepalive;
/// Enable polling of remote peer states to reduce derp traffic
boolean? enable_polling;
/// Use Mozilla's root certificates instead of OS ones [default false]
Expand Down

0 comments on commit f1ff82b

Please sign in to comment.