Skip to content

Commit

Permalink
Add DERP userspace keepalives
Browse files Browse the repository at this point in the history
Signed-off-by: Daniel Fetti <[email protected]>
  • Loading branch information
dfetti committed Nov 21, 2024
1 parent 5b33aea commit d5be027
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 30 deletions.
4 changes: 4 additions & 0 deletions crates/telio-model/src/features.rs
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,8 @@ pub struct FeatureDerp {
pub tcp_keepalive: Option<u32>,
/// Derp will send empty messages after this many seconds of not sending/receiving any data [default 60s]
pub derp_keepalive: Option<u32>,
/// Poll Keepalive: Application level keepalives meant to replace the TCP keepalives
pub poll_keepalive: Option<u32>,
/// Enable polling of remote peer states to reduce derp traffic
pub enable_polling: Option<bool>,
/// Use Mozilla's root certificates instead of OS ones [default false]
Expand Down Expand Up @@ -520,6 +522,7 @@ mod tests {
"derp": {
"tcp_keepalive": 13,
"derp_keepalive": 14,
"poll_keepalive": 15,
"enable_polling": true,
"use_built_in_root_certificates": true
},
Expand Down Expand Up @@ -606,6 +609,7 @@ mod tests {
derp: Some(FeatureDerp {
tcp_keepalive: Some(13),
derp_keepalive: Some(14),
poll_keepalive: Some(15),
enable_polling: Some(true),
use_built_in_root_certificates: true,
}),
Expand Down
9 changes: 8 additions & 1 deletion crates/telio-relay/src/derp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,35 +127,41 @@ struct State {
/// Keepalive values that help keeping Derp connection in conntrack alive,
/// so server can send traffic after being silent for a while
/// *derp_keepalive* is also used as an interval for retrieving remote peer states.
/// TODO: Update comments for Poll Keepalives
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DerpKeepaliveConfig {
tcp_keepalive: u32,
derp_keepalive: u32,
poll_keepalive: Option<u32>,
}

impl From<&Option<FeatureDerp>> for DerpKeepaliveConfig {
fn from(derp: &Option<FeatureDerp>) -> Self {
let mut tcp_keepalive = proto::DERP_TCP_KEEPALIVE_INTERVAL;
let mut derp_keepalive = proto::DERP_KEEPALIVE_INTERVAL;
let mut poll_keepalive = None;
if let Some(derp) = derp {
if let Some(tcp_ka) = derp.tcp_keepalive {
tcp_keepalive = tcp_ka;
}
if let Some(derp_ka) = derp.derp_keepalive {
derp_keepalive = derp_ka;
}
poll_keepalive = derp.poll_keepalive;
}

DerpKeepaliveConfig {
tcp_keepalive,
derp_keepalive,
poll_keepalive,
}
}
}

const DEFAULT_SERVER_KEEPALIVE_CONFIG: DerpKeepaliveConfig = DerpKeepaliveConfig {
tcp_keepalive: proto::DERP_TCP_KEEPALIVE_INTERVAL,
derp_keepalive: proto::DERP_KEEPALIVE_INTERVAL,
poll_keepalive: None,
};

/// Derp configuration
Expand Down Expand Up @@ -725,8 +731,9 @@ impl Runtime for State {
},
// On tick send derp poll request to derp stream
Some((permit, _)) = wait_for_tx(&c.comms_direct.tx, poll_timer_tick) => {
if config.enable_polling {
if config.enable_polling || config.server_keepalives.poll_keepalive.is_some() {
self.derp_poll_session = self.derp_poll_session.wrapping_add(1);
telio_log_debug!("Sending DerpPollRequest with session {}", self.derp_poll_session);
Self::handle_outcoming_payload_direct(permit, PacketControl::DerpPollRequest(DerpPollRequestMsg::new(
self.derp_poll_session, &config.meshnet_peers
))).await;
Expand Down
100 changes: 71 additions & 29 deletions crates/telio-relay/src/derp/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use std::{
};
use telio_sockets::{SocketBufSizes, SocketPool, TcpParams};
use telio_task::io::Chan;
use telio_utils::interval_at;
use telio_utils::{interval_at, telio_log_debug};
use webpki_roots::TLS_SERVER_ROOTS;

use crate::{Config, DerpKeepaliveConfig};
Expand Down Expand Up @@ -89,18 +89,34 @@ pub async fn connect_http_and_start(
};
let hostport = format!("{}:{}", hostname, port);

let socket = socket_pool.new_external_tcp_v4(Some(TcpParams {
keepalive_enable: Some(true),
keepalive_cnt: Some(TCP_KEEPALIVE_COUNT),
keepalive_idle: Some(TCP_KEEPALIVE_IDLE),
keepalive_intvl: Some(TCP_KEEPALIVE_INTERVAL),
nodelay_enable: Some(true),
user_timeout: Some(TCP_USER_TIMEOUT),
buf_size: SocketBufSizes {
tx_buf_size: Some(SOCK_BUF_SZ),
rx_buf_size: Some(SOCK_BUF_SZ),
},
}))?;
let tcp_params = if derp_config.server_keepalives.poll_keepalive.is_some() {
// If Poll Keepalives are enabled we must disable TCP Keepalives
TcpParams {
nodelay_enable: Some(true),
user_timeout: Some(TCP_USER_TIMEOUT),
buf_size: SocketBufSizes {
tx_buf_size: Some(SOCK_BUF_SZ),
rx_buf_size: Some(SOCK_BUF_SZ),
},
..Default::default()
}
} else {
// If Poll Keepalives are disabled, use the TCP Keepalives
TcpParams {
keepalive_enable: Some(true),
keepalive_cnt: Some(TCP_KEEPALIVE_COUNT),
keepalive_idle: Some(TCP_KEEPALIVE_IDLE),
keepalive_intvl: Some(TCP_KEEPALIVE_INTERVAL),
nodelay_enable: Some(true),
user_timeout: Some(TCP_USER_TIMEOUT),
buf_size: SocketBufSizes {
tx_buf_size: Some(SOCK_BUF_SZ),
rx_buf_size: Some(SOCK_BUF_SZ),
},
}
};

let socket = socket_pool.new_external_tcp_v4(Some(tcp_params))?;

let stream = timeout(derp_config.timeout, socket.connect(ip)).await??;

Expand Down Expand Up @@ -181,6 +197,15 @@ async fn connect_and_start<RW: AsyncRead + AsyncWrite + Send + 'static>(
let sender_direct = conn_side_direct.tx;
let receiver_direct = conn_side_direct.rx;

// TODO: When we switch to batched Poll Keepalives, we have to move this into the session keeper
// The logic is here for now for testing purposes
let poll_interval = if let Some(poll_keepalive) = server_keepalives.poll_keepalive {
Duration::from_secs(poll_keepalive as u64)
} else {
Duration::from_secs(server_keepalives.derp_keepalive as u64)
};
telio_log_debug!("Derp poll interval {:?}", poll_interval);

Ok(DerpConnection {
comms_relayed: comm_side_relayed,
comms_direct: comm_side_direct,
Expand All @@ -190,10 +215,7 @@ async fn connect_and_start<RW: AsyncRead + AsyncWrite + Send + 'static>(
join_receiver: tokio::spawn(async move {
start_write(writer, receiver_relayed, receiver_direct, addr).await
}),
poll_timer: {
let poll_interval = Duration::from_secs(server_keepalives.derp_keepalive as u64);
interval_at(tokio::time::Instant::now() + poll_interval, poll_interval)
},
poll_timer: { interval_at(tokio::time::Instant::now() + poll_interval, poll_interval) },
})
}

Expand All @@ -205,23 +227,38 @@ async fn connect_http<R: AsyncRead + Unpin, W: AsyncWrite + Unpin>(
server_keepalives: &DerpKeepaliveConfig,
host: &str,
) -> Result<Vec<u8>, Error> {
writer
.write_all(
format!(
"GET /derp HTTP/1.1\r\n\
let request = if let Some(poll_keepalives) = server_keepalives.poll_keepalive {
format!(
"GET /derp HTTP/1.1\r\n\
Host: {host}\r\n\
Connection: Upgrade\r\n\
Upgrade: WebSocket\r\n\
User-Agent: telio/{} {}\r\n\
Keep-Alive: tcp={}, derp={}\r\n\r\n",
telio_utils::version_tag(),
std::env::consts::OS,
server_keepalives.tcp_keepalive,
server_keepalives.derp_keepalive,
)
.as_bytes(),
Keep-Alive: tcp={}, derp={}\r\n\
Poll-Keepalive: {}\r\n\r\n",
telio_utils::version_tag(),
std::env::consts::OS,
server_keepalives.tcp_keepalive,
server_keepalives.derp_keepalive,
poll_keepalives,
)
.await?;
} else {
format!(
"GET /derp HTTP/1.1\r\n\
Host: {host}\r\n\
Connection: Upgrade\r\n\
Upgrade: WebSocket\r\n\
User-Agent: telio/{} {}\r\n\
Keep-Alive: tcp={}, derp={}\r\n\r\n",
telio_utils::version_tag(),
std::env::consts::OS,
server_keepalives.tcp_keepalive,
server_keepalives.derp_keepalive,
)
};
telio_log_debug!("DERP connect request: {}", &request);

writer.write_all(request.as_bytes()).await?;

let mut data = [0_u8; MAX_TCP_PACKET_SIZE];
let data_len = reader.read(&mut data).await?;
Expand Down Expand Up @@ -271,6 +308,10 @@ mod tests {
request.headers().get("keep-alive"),
Some(&HeaderValue::from_static("tcp=1, derp=2"))
);
assert_eq!(
request.headers().get("Poll-Keepalive"),
Some(&HeaderValue::from_static("3"))
);
Ok(Response::builder()
.body(hyper::Body::from(RESPONSE_BODY))
.unwrap())
Expand All @@ -297,6 +338,7 @@ mod tests {
let derp_config = DerpKeepaliveConfig {
tcp_keepalive: 1,
derp_keepalive: 2,
poll_keepalive: Some(3),
};
assert_eq!(
RESPONSE_BODY.as_bytes(),
Expand Down
2 changes: 2 additions & 0 deletions src/libtelio.udl
Original file line number Diff line number Diff line change
Expand Up @@ -670,6 +670,8 @@ dictionary FeatureDerp {
u32? tcp_keepalive;
/// Derp will send empty messages after this many seconds of not sending/receiving any data [default 60s]
u32? derp_keepalive;
/// Poll Keepalive: Application level keepalives meant to replace the TCP keepalives
u32? poll_keepalive;
/// Enable polling of remote peer states to reduce derp traffic
boolean? enable_polling;
/// Use Mozilla's root certificates instead of OS ones [default false]
Expand Down

0 comments on commit d5be027

Please sign in to comment.