Skip to content

Commit

Permalink
[Refactor] Add helper routine to establish TCP connection (#17)
Browse files Browse the repository at this point in the history
This adds auxiliary routine to "net" module in order to establish TCP
connection with passed options.
  • Loading branch information
boris-sinyapkin committed Apr 27, 2024
1 parent a0e9c8e commit 6091fe0
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 34 deletions.
59 changes: 58 additions & 1 deletion src/common/net.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
use crate::common::error::LurkError;
use anyhow::{anyhow, Result};
use bytes::BufMut;
use log::{debug, trace};
use socket2::{SockRef, TcpKeepalive};
use std::{
fmt::Display,
net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6},
};
use tokio::{io::AsyncReadExt, net::lookup_host};
use tokio::{
io::AsyncReadExt,
net::{lookup_host, TcpStream},
};

macro_rules! ipv4_socket_address {
($ipv4:expr, $port:expr) => {
Expand Down Expand Up @@ -86,6 +91,58 @@ impl Address {
}
}

/// Different TCP connection options.
///
/// **Fields**:
/// * ```keep_alive``` - setting for TCP keepalive procedure
///
///
pub struct TcpConnectionOptions {
keep_alive: Option<TcpKeepalive>,
}

impl TcpConnectionOptions {
pub fn new() -> TcpConnectionOptions {
TcpConnectionOptions { keep_alive: None }
}

pub fn set_keepalive(&mut self, keep_alive: TcpKeepalive) -> &mut TcpConnectionOptions {
debug_assert!(self.keep_alive.is_none(), "should be unset");
self.keep_alive = Some(keep_alive);
self
}

pub fn apply_to(&self, tcp_stream: &mut TcpStream) -> Result<()> {
let tcp_sock_ref = SockRef::from(&tcp_stream);

if let Some(keep_alive) = &self.keep_alive {
tcp_sock_ref.set_tcp_keepalive(keep_alive)?;
}

Ok(())
}
}

/// Establish TCP connection with passed ```endpoint```.
///
/// Input ```tcp_opts``` are applied to created TCP socket right after stream creation.
pub async fn establish_tcp_connection_with_opts(endpoint: &Address, tcp_opts: &TcpConnectionOptions) -> Result<TcpStream> {
// Resolve endpoint address.
trace!("Endpoint address {} resolution: ... ", endpoint);
let resolved = endpoint.to_socket_addr().await?;
trace!("Endpoint address {} resolution: SUCCESS with {}", endpoint, resolved);

// Establish TCP connection with the endpoint.
debug!("TCP connection establishment with the endpoint {}: ... ", endpoint);
let mut tcp_stream = TcpStream::connect(resolved).await.map_err(anyhow::Error::from)?;
debug!("TCP connection establishment with the endpoint {}: SUCCESS", endpoint);

// Apply passed options to created TCP stream.
tcp_opts.apply_to(&mut tcp_stream)?;

Ok(tcp_stream)
}

impl Display for Address {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Expand Down
8 changes: 3 additions & 5 deletions src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,7 @@ impl LurkServer {
let tcp_listener = self.bind().await?;
loop {
match tcp_listener.accept().await {
Ok((stream, addr)) => {
log_opened_tcp_conn!(addr);
self.on_new_peer_connected(stream, addr).await
}
Ok((stream, addr)) => self.on_tcp_connection_established(stream, addr).await,
Err(err) => warn!("Error while accepting the TCP connection: {}", err),
}
}
Expand All @@ -40,7 +37,8 @@ impl LurkServer {
Ok(tcp_listener)
}

async fn on_new_peer_connected(&self, stream: TcpStream, addr: SocketAddr) {
async fn on_tcp_connection_established(&self, stream: TcpStream, addr: SocketAddr) {
log_opened_tcp_conn!(addr);
// Identify peer type.
let peer_type = match LurkPeerType::from_tcp_stream(&stream).await {
Ok(t) => {
Expand Down
41 changes: 13 additions & 28 deletions src/server/peer/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::{
common::{
error::LurkError,
logging::{log_request_handling_error, log_tunnel_closed, log_tunnel_closed_with_error, log_tunnel_created},
net::Address,
net::{self, Address},
},
io::{tunnel::LurkTunnel, LurkRequestRead, LurkResponseWrite},
proto::socks5::{
Expand All @@ -14,17 +14,14 @@ use crate::{
};
use anyhow::{bail, Result};
use human_bytes::human_bytes;
use log::{debug, error, info, trace};
use socket2::{SockRef, TcpKeepalive};
use log::{debug, error, info};
use socket2::TcpKeepalive;
use std::{
net::SocketAddr,
ops::{Deref, DerefMut},
time::Duration,
};
use tokio::{
io::{AsyncRead, AsyncWrite},
net::TcpStream,
};
use tokio::io::{AsyncRead, AsyncWrite};

pub struct LurkSocks5PeerHandler<S>
where
Expand Down Expand Up @@ -129,29 +126,17 @@ where
debug!("Handling SOCKS5 CONNECT from {}", self.peer);
let peer_address = self.peer.to_string();

// Resolve endpoint address.
trace!("Endpoint address {} resolution: ... ", endpoint_address);
let resolved_address = endpoint_address.to_socket_addr().await?;
trace!(
"Endpoint address {} resolution: SUCCESS with {}",
endpoint_address,
resolved_address
// Create TCP options.
let mut tcp_opts = net::TcpConnectionOptions::new();
tcp_opts.set_keepalive(
TcpKeepalive::new()
.with_time(Duration::from_secs(300)) // 5 min
.with_interval(Duration::from_secs(60)) // 1 min
.with_retries(5),
);

// Establish TCP connection with the endpoint.
debug!("TCP connection establishment with the endpoint {}: ... ", endpoint_address);
let mut r2l = TcpStream::connect(resolved_address).await.map_err(anyhow::Error::from)?;
debug!("TCP connection establishment with the endpoint {}: SUCCESS", endpoint_address);

// Configure keep-alive probe.
let r2l_sock_ref = SockRef::from(&r2l);
let keepalive = TcpKeepalive::new()
.with_time(Duration::from_secs(300)) // 5 min
.with_interval(Duration::from_secs(60)) // 1 min
.with_retries(5);

// Configure keep-alive probe on R2L TCP socket.
r2l_sock_ref.set_tcp_keepalive(&keepalive)?;
// Establish TCP connection with the target endpoint.
let mut r2l = net::establish_tcp_connection_with_opts(endpoint_address, &tcp_opts).await?;

// Respond to relay request with success.
let response = RelayResponse::builder()
Expand Down

0 comments on commit 6091fe0

Please sign in to comment.