From cba2468dfba10991479697bdf4794d6ac28a9032 Mon Sep 17 00:00:00 2001 From: Boris S Date: Mon, 12 Aug 2024 12:50:12 +0300 Subject: [PATCH] [Feature] Introduce handling of HTTP(S) proxy This adds new category of traffic for proxy, currently unstable and untested. Would be improved further. --- Cargo.toml | 1 + src/common/logging.rs | 2 +- src/io/mod.rs | 14 --- src/io/stream.rs | 72 ------------- src/net/tcp.rs | 42 +++++--- src/server/handlers.rs | 232 +++++++++++++++++++++++++++++------------ src/server/mod.rs | 18 ++-- tests/integration.rs | 6 +- 8 files changed, 208 insertions(+), 179 deletions(-) delete mode 100644 src/io/stream.rs diff --git a/Cargo.toml b/Cargo.toml index d01f153..9cf6884 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,6 +18,7 @@ rand = { version = "0.8.5" } [dependencies] anyhow = { version = "1.0.81" } async-listen = { version = "0.2.1" } +async-trait = { version = "*" } bytes = { version = "1.6.0" } clap = { version = "4.5.3", features = ["derive"] } cfg-if = { version = "1.0" } diff --git a/src/common/logging.rs b/src/common/logging.rs index a451b1b..b895f85 100644 --- a/src/common/logging.rs +++ b/src/common/logging.rs @@ -87,7 +87,7 @@ macro_rules! log_tcp_closed_conn { macro_rules! log_tcp_established_conn { ($conn_addr:expr, $conn_label:expr) => { info!( - "\n\n\tTCP {} connection has been OPENED: \ + "\n\n\tTCP connection with {} label has been OPENED: \ \n\t\tpeer: '{}' \ \n", $conn_label, $conn_addr, diff --git a/src/io/mod.rs b/src/io/mod.rs index 4d92aac..9077580 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -1,8 +1,6 @@ use anyhow::Result; -use std::fmt::Debug; use tokio::io::{AsyncReadExt, AsyncWriteExt}; -pub mod stream; pub mod tunnel; pub trait LurkRequest { @@ -14,15 +12,3 @@ pub trait LurkRequest { pub trait LurkResponse { async fn write_to(&self, stream: &mut T) -> Result<()>; } - -pub trait LurkResponseWrite { - async fn write_response(&mut self, response: Response) -> Result<()> - where - Response: LurkResponse + Debug + 'static; -} - -pub trait LurkRequestRead { - async fn read_request(&mut self) -> Result - where - Request: LurkRequest + Debug + 'static; -} diff --git a/src/io/stream.rs b/src/io/stream.rs deleted file mode 100644 index e93b909..0000000 --- a/src/io/stream.rs +++ /dev/null @@ -1,72 +0,0 @@ -use super::{LurkRequest, LurkRequestRead, LurkResponse, LurkResponseWrite}; -use anyhow::Result; -use log::trace; -use std::{ - fmt::Debug, - ops::{Deref, DerefMut}, -}; -use tokio::{ - io::{AsyncReadExt, AsyncWriteExt}, - net::TcpStream, -}; - -/// Alias for stream wrapper over `TcpStream` -pub type LurkTcpStream = LurkStream; - -/// Stream wrapper implementation - -pub struct LurkStream { - stream: T, -} - -impl LurkStream -where - T: AsyncReadExt + AsyncWriteExt + Unpin, -{ - pub fn new(stream: T) -> LurkStream { - LurkStream { stream } - } -} - -impl LurkRequestRead for LurkStream -where - T: AsyncReadExt + AsyncWriteExt + Unpin, -{ - async fn read_request(&mut self) -> Result - where - Request: LurkRequest + Debug, - { - let request = Request::read_from(&mut self.stream).await?; - trace!("Read {:?}", request); - - Ok(request) - } -} - -impl LurkResponseWrite for LurkStream -where - T: AsyncReadExt + AsyncWriteExt + Unpin, -{ - async fn write_response(&mut self, response: Response) -> Result<()> - where - Response: LurkResponse + Debug, - { - Response::write_to(&response, &mut self.stream).await?; - trace!("Write {:?}", response); - - Ok(()) - } -} - -impl Deref for LurkStream { - type Target = T; - fn deref(&self) -> &Self::Target { - &self.stream - } -} - -impl DerefMut for LurkStream { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.stream - } -} diff --git a/src/net/tcp.rs b/src/net/tcp.rs index 81dcb72..df61a99 100644 --- a/src/net/tcp.rs +++ b/src/net/tcp.rs @@ -175,7 +175,7 @@ pub mod listener { .expect("Expect accepted connection before expired timeout") .expect("Expect accepted TCP connection"); - assert_eq!(LurkTcpConnectionLabel::SOCKS5, conn.label()); + assert_eq!(LurkTcpConnectionLabel::Socks5, conn.label()); assert!( listener.factory.get_active_tokens() <= conn_limit, "Number of opened connections must not exceed the limit" @@ -195,8 +195,8 @@ pub mod listener { pub mod connection { - use crate::io::stream::{LurkStream, LurkTcpStream}; use anyhow::{bail, Result}; + use async_trait::async_trait; use std::{fmt::Display, io, net::SocketAddr}; use tokio::net::TcpStream; @@ -209,7 +209,13 @@ pub mod connection { #[repr(u8)] pub enum LurkTcpConnectionLabel { /// Traffic of TCP connection belongs to proxy SOCKS5 protocol - SOCKS5 = 0x05, + Socks5, + + /// Traffic of TCP connection belongs to HTTP protocol + Http, + + /// Traffic of TCP connection belongs to HTTPS protocol + HttpSecure, /// Unknown traffic Unknown(u8), @@ -227,7 +233,9 @@ pub mod connection { if peeked_bytes == 1 { let label = match buff[0] { - 0x05 => LurkTcpConnectionLabel::SOCKS5, + 0x47 => LurkTcpConnectionLabel::Http, + 0x43 => LurkTcpConnectionLabel::HttpSecure, + 0x05 => LurkTcpConnectionLabel::Socks5, v => LurkTcpConnectionLabel::Unknown(v), }; @@ -241,8 +249,10 @@ pub mod connection { impl Display for LurkTcpConnectionLabel { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - LurkTcpConnectionLabel::SOCKS5 => write!(f, "SOCKS5"), - LurkTcpConnectionLabel::Unknown(l) => write!(f, "Unknown TCP label {l:#04x}"), + LurkTcpConnectionLabel::Http => write!(f, "HTTP"), + LurkTcpConnectionLabel::HttpSecure => write!(f, "HTTPS"), + LurkTcpConnectionLabel::Socks5 => write!(f, "SOCKS5"), + LurkTcpConnectionLabel::Unknown(l) => write!(f, "unknown {l:#04x}"), } } } @@ -273,8 +283,7 @@ pub mod connection { } pub struct LurkTcpConnection { - /// Lurk wrapper of TcpStream - stream: LurkTcpStream, + stream: TcpStream, /// Label describing traffic in this TCP connection label: LurkTcpConnectionLabel, /// Remote address that this connection is connected to @@ -284,11 +293,11 @@ pub mod connection { } impl LurkTcpConnection { - fn new(tcp_stream: TcpStream, label: LurkTcpConnectionLabel) -> Result { + fn new(stream: TcpStream, label: LurkTcpConnectionLabel) -> Result { Ok(LurkTcpConnection { - peer_addr: tcp_stream.peer_addr()?, - local_addr: tcp_stream.local_addr()?, - stream: LurkStream::new(tcp_stream), + peer_addr: stream.peer_addr()?, + local_addr: stream.local_addr()?, + stream, label, }) } @@ -305,11 +314,16 @@ pub mod connection { self.label } - pub fn stream_mut(&mut self) -> &mut LurkTcpStream { + pub fn stream_mut(&mut self) -> &mut TcpStream { &mut self.stream } } + #[async_trait] + pub trait LurkTcpConnectionHandler: Send { + async fn handle(&mut self, mut conn: LurkTcpConnection) -> Result<()>; + } + #[cfg(test)] mod tests { @@ -338,7 +352,7 @@ pub mod connection { .accept() .and_then(|(s, _)| async move { let label = LurkTcpConnectionLabel::from_tcp_stream(&s).await.unwrap(); - assert_eq!(LurkTcpConnectionLabel::SOCKS5, label); + assert_eq!(LurkTcpConnectionLabel::Socks5, label); Ok(()) }) .await diff --git a/src/server/handlers.rs b/src/server/handlers.rs index 273246e..d96ab25 100644 --- a/src/server/handlers.rs +++ b/src/server/handlers.rs @@ -1,10 +1,10 @@ use crate::{ auth::LurkAuthenticator, common::{error::LurkError, logging}, - io::{tunnel::LurkTunnel, LurkRequestRead, LurkResponseWrite}, + io::{tunnel::LurkTunnel, LurkRequest, LurkResponse}, net::{ tcp::{ - connection::{LurkTcpConnection, LurkTcpConnectionLabel}, + connection::{LurkTcpConnection, LurkTcpConnectionHandler, LurkTcpConnectionLabel}, establish_tcp_connection_with_opts, TcpConnectionOptions, }, Address, @@ -15,36 +15,30 @@ use crate::{ Command, }, }; -use anyhow::{bail, Result}; +use anyhow::{anyhow, bail, Result}; +use async_trait::async_trait; +use bytes::Bytes; +use http_body_util::{combinators::BoxBody, BodyExt, Empty, Full}; use human_bytes::human_bytes; -use log::{debug, error, info}; +use hyper::{ + client, + server::{self}, + service::service_fn, + Method, Request, Response, StatusCode, +}; +use hyper_util::rt::TokioIo; +use log::{debug, error, info, log_enabled, trace}; use socket2::TcpKeepalive; use std::time::Duration; use tokio::net::TcpStream; -pub struct LurkSocks5Handler { - conn: LurkTcpConnection, -} +pub struct LurkSocks5Handler {} impl LurkSocks5Handler { - pub fn new(conn: LurkTcpConnection) -> LurkSocks5Handler { - LurkSocks5Handler { conn } - } - - pub async fn handle(&mut self) -> Result<()> { - debug_assert_eq!(LurkTcpConnectionLabel::SOCKS5, self.conn.label(), "expected SOCKS5 label"); - // Complete handshake process and authenticate the client on success. - self.process_handshake().await?; - // Proceed with SOCKS5 relay handling. - // This will receive and process relay request, handle SOCKS5 command - // and establish the tunnel "client <-- lurk proxy --> target". - self.process_relay_request().await - } - /// Handshaking with SOCKS5 client. /// Afterwards, authenticator should contain negotiated method. - async fn process_handshake(&mut self) -> Result<()> { - let request = self.conn.stream_mut().read_request::().await?; + async fn process_handshake(conn: &mut LurkTcpConnection) -> Result<()> { + let request = HandshakeRequest::read_from(conn.stream_mut()).await?; // Authenticator will select method among all stored in request // and authenticate the connection on success. @@ -54,58 +48,58 @@ impl LurkSocks5Handler { match authenticator.select_auth_method(request.auth_methods()) { Some(method) => { - debug!("Selected authentication method {:?} for {}", method, self.conn.peer_addr()); + debug!("Selected authentication method {:?} for {}", method, conn.peer_addr()); // Respond to the client with selected method. response_builder.with_auth_method(method); - self.conn.stream_mut().write_response(response_builder.build()).await?; + response_builder.build().write_to(conn.stream_mut()).await?; // Authenticate the client by using selected method. // Note: Currently, only None method (disabled auth) is supported, // so just a sanity check here. - authenticator.authenticate_connection(&self.conn) + authenticator.authenticate_connection(conn) } None => { - debug!("No acceptable methods identified for {}", self.conn.peer_addr()); + debug!("No acceptable methods identified for {}", conn.peer_addr()); response_builder.with_no_acceptable_method(); - self.conn.stream_mut().write_response(response_builder.build()).await?; + response_builder.build().write_to(conn.stream_mut()).await?; bail!(LurkError::NoAcceptableAuthenticationMethod) } } } /// Handling SOCKS5 command which comes in relay request from client. - async fn process_relay_request(&mut self) -> Result<()> { - let request = self.conn.stream_mut().read_request::().await?; + async fn process_relay_request(conn: &mut LurkTcpConnection) -> Result<()> { + let peer_addr = conn.peer_addr(); + let local_addr = conn.local_addr(); + let inbound_stream = conn.stream_mut(); + let request = RelayRequest::read_from(inbound_stream).await?; let command = request.command(); let address = request.endpoint_address(); // Bail out and notify client if command isn't supported if command != Command::TCPConnect { let err = anyhow::anyhow!(LurkError::UnsupportedSocksCommand(command)); - return self.on_relay_request_handling_error(err, &request).await; + return LurkSocks5Handler::on_relay_request_handling_error(err, &request, conn).await; } - let (conn_peer_addr, conn_bound_addr) = (self.conn.peer_addr(), self.conn.local_addr()); + let (conn_peer_addr, conn_bound_addr) = (peer_addr, local_addr); debug!("Handling SOCKS5 CONNECT from {}", conn_peer_addr); // Create TCP stream with the endpoint - let mut r2l = match self.establish_tcp_connection(address).await { - Ok(stream) => { + let mut outbound_stream = match LurkSocks5Handler::establish_tcp_connection(address).await { + Ok(outbound_stream) => { // On success, respond to relay request with success let response = RelayResponse::builder().with_success().with_bound_address(conn_bound_addr).build(); - self.conn.stream_mut().write_response(response).await?; + response.write_to(inbound_stream).await?; - stream + outbound_stream } - Err(err) => return self.on_relay_request_handling_error(err, &request).await, + Err(err) => return LurkSocks5Handler::on_relay_request_handling_error(err, &request, conn).await, }; - // Acquire mutable reference to inner object of stream wrapper. - let mut l2r = &mut **self.conn.stream_mut(); - // Create proxy tunnel which operates with the following TCP streams: // - L2R: client <--> proxy // - R2L: endpoint <--> proxy - let mut tunnel = LurkTunnel::new(&mut l2r, &mut r2l); + let mut tunnel = LurkTunnel::new(inbound_stream, &mut outbound_stream); logging::log_tunnel_created!(conn_peer_addr, conn_bound_addr, address); @@ -122,18 +116,15 @@ impl LurkSocks5Handler { Ok(()) } - async fn on_relay_request_handling_error(&mut self, err: anyhow::Error, request: &RelayRequest) -> Result<()> { + async fn on_relay_request_handling_error(err: anyhow::Error, request: &RelayRequest, conn: &mut LurkTcpConnection) -> Result<()> { let err_msg = err.to_string(); - let response = RelayResponse::builder() - .with_err(err) - .with_bound_address(self.conn.local_addr()) - .build(); + let response = RelayResponse::builder().with_err(err).with_bound_address(conn.local_addr()).build(); - logging::log_request_handling_error!(self.conn, err_msg, request, response); - self.conn.stream_mut().write_response(response).await + logging::log_request_handling_error!(conn, err_msg, request, response); + response.write_to(conn.stream_mut()).await } - async fn establish_tcp_connection(&mut self, endpoint_address: &Address) -> Result { + async fn establish_tcp_connection(endpoint_address: &Address) -> Result { // Create TCP options. let mut tcp_opts = TcpConnectionOptions::new(); tcp_opts.set_keepalive( @@ -148,6 +139,125 @@ impl LurkSocks5Handler { } } +#[async_trait] +impl LurkTcpConnectionHandler for LurkSocks5Handler { + async fn handle(&mut self, mut conn: LurkTcpConnection) -> Result<()> { + debug_assert_eq!(LurkTcpConnectionLabel::Socks5, conn.label(), "expected SOCKS5 label"); + // Complete handshake process and authenticate the client on success. + LurkSocks5Handler::process_handshake(&mut conn).await?; + // Proceed with SOCKS5 relay handling. + // This will receive and process relay request, handle SOCKS5 command + // and establish the tunnel "client <-- lurk proxy --> target". + LurkSocks5Handler::process_relay_request(&mut conn).await + } +} + +pub struct LurkHttpHandler {} + +impl LurkHttpHandler { + async fn serve_request(request: Request) -> Result>> { + // Dump full request data if trace is enabled + if log_enabled!(log::Level::Trace) { + trace!("{:?}", request); + } else { + info!("{:?} {} '{}'", request.version(), request.method(), request.uri()); + } + + if request.method() == Method::CONNECT { + let addr_str = match request.uri().authority() { + Some(str) => str.to_string(), + None => { + let err_msg = format!("CONNECT host is not socket addr: {:?}", request.uri()); + let mut response = Response::new(Self::full(err_msg)); + *response.status_mut() = StatusCode::BAD_REQUEST; + return Ok(response); + } + }; + + tokio::spawn(async move { + // Upgrage HTTP connection. + let upgraded = match hyper::upgrade::on(request).await { + Ok(upgraded) => upgraded, + Err(err) => { + error!("HTTP upgrade error: {}", err); + return; + } + }; + + // On successful upgrade, establish remote TCP connection + // and start data relaying. + match TcpStream::connect(addr_str).await { + Ok(mut outbound) => { + let mut inbdound = TokioIo::new(upgraded); + let mut tunnel = LurkTunnel::new(&mut inbdound, &mut outbound); + + // Start tunnel. + if let Err(err) = tunnel.run().await { + error!("Error occurred while tunnel was running: {}", err); + } + } + Err(err) => { + error!("Failed to establish TCP connection: {}", err); + } + } + }); + + Ok(Response::new(Self::empty())) + } else { + let host = request.uri().host().ok_or(anyhow!("HTTP request has no host"))?; + let port = request.uri().port_u16().unwrap_or(80); + + let stream = TcpStream::connect((host, port)).await?; + let io = TokioIo::new(stream); + + let (mut sender, conn) = client::conn::http1::Builder::new().handshake(io).await?; + + // Spawn a task to poll the connection and drive the HTTP state. + tokio::spawn(async move { + if let Err(err) = conn.await { + error!("Connection failed: {:?}", err); + } + }); + + // Send request on associated connection. + let response = sender.send_request(request).await?; + + Ok(response.map(|r| r.boxed())) + } + } + + // + // Routines taken from example of proxy implementation based on hyper: + // https://github.com/hyperium/hyper/blob/master/examples/http_proxy.rs + // + fn empty() -> BoxBody { + Empty::::new().map_err(|never| match never {}).boxed() + } + + fn full>(chunk: T) -> BoxBody { + Full::new(chunk.into()).map_err(|never| match never {}).boxed() + } +} + +#[async_trait] +impl LurkTcpConnectionHandler for LurkHttpHandler { + async fn handle(&mut self, mut conn: LurkTcpConnection) -> Result<()> { + let io = TokioIo::new(conn.stream_mut()); + server::conn::http1::Builder::new() + .serve_connection(io, service_fn(LurkHttpHandler::serve_request)) + .await + .map_err(anyhow::Error::from) + } +} + +pub fn create_tcp_connection_handler(label: &LurkTcpConnectionLabel) -> Result> { + match label { + LurkTcpConnectionLabel::Http | LurkTcpConnectionLabel::HttpSecure => Ok(Box::new(LurkHttpHandler {})), + LurkTcpConnectionLabel::Socks5 => Ok(Box::new(LurkSocks5Handler {})), + LurkTcpConnectionLabel::Unknown(_) => bail!("Unknown TCP connection"), + } +} + #[cfg(test)] mod tests { @@ -164,9 +274,7 @@ mod tests { #[tokio::test] async fn socks5_handshake_with_auth_method() { - let mut listener = LurkTcpListener::bind(TEST_BIND_IPV4) - .await - .expect("Expect binded listener"); + let mut listener = LurkTcpListener::bind(TEST_BIND_IPV4).await.expect("Expect binded listener"); let listener_addr = listener.local_addr(); let client_handle = tokio::spawn(async move { @@ -194,20 +302,16 @@ mod tests { tokio::task::yield_now().await; - let conn = listener.accept().await.expect("Expect created connection"); - assert_eq!(LurkTcpConnectionLabel::SOCKS5, conn.label()); - - let mut handler = LurkSocks5Handler::new(conn); - assert_ok!(handler.process_handshake().await); + let mut conn = listener.accept().await.expect("Expect created connection"); + assert_eq!(LurkTcpConnectionLabel::Socks5, conn.label()); + assert_ok!(LurkSocks5Handler::process_handshake(&mut conn).await); assert_ok!(client_handle.into_future().await); } #[tokio::test] async fn socks5_handshake_with_non_accepatable_method() { - let mut listener = LurkTcpListener::bind(TEST_BIND_IPV4) - .await - .expect("Expect binded listener"); + let mut listener = LurkTcpListener::bind(TEST_BIND_IPV4).await.expect("Expect binded listener"); let listener_addr = listener.local_addr(); let client_handle = tokio::spawn(async move { @@ -231,13 +335,11 @@ mod tests { tokio::task::yield_now().await; - let conn = listener.accept().await.expect("Expect created connection"); - assert_eq!(LurkTcpConnectionLabel::SOCKS5, conn.label()); - - let mut handler = LurkSocks5Handler::new(conn); + let mut conn = listener.accept().await.expect("Expect created connection"); + assert_eq!(LurkTcpConnectionLabel::Socks5, conn.label()); assert_lurk_err!( LurkError::NoAcceptableAuthenticationMethod, - handler.process_handshake().await.expect_err("Expect error") + LurkSocks5Handler::process_handshake(&mut conn).await.expect_err("Expect error") ); assert_ok!(client_handle.into_future().await); diff --git a/src/server/mod.rs b/src/server/mod.rs index 2089778..2bafe87 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -1,13 +1,10 @@ -use self::handlers::LurkSocks5Handler; use crate::{ common::logging::{self}, - net::tcp::{ - connection::{LurkTcpConnection, LurkTcpConnectionLabel}, - listener::LurkTcpListener, - }, + net::tcp::{connection::LurkTcpConnection, listener::LurkTcpListener}, }; use anyhow::Result; use async_listen::is_transient_error; +use handlers::create_tcp_connection_handler; use log::{error, info, warn}; use stats::LurkServerStats; use std::{net::SocketAddr, sync::Arc, time::Duration}; @@ -64,16 +61,17 @@ impl LurkServer { logging::log_tcp_established_conn!(conn_peer_addr, conn_label); // Create connection handler and supply handling of particular traffic label in a separate thread. - let mut connection_handler = match conn.label() { - LurkTcpConnectionLabel::SOCKS5 => LurkSocks5Handler::new(conn), - unknown_label => { - logging::log_tcp_closed_conn_with_error!(conn_peer_addr, conn_label, unknown_label); + let mut connection_handler = match create_tcp_connection_handler(&conn.label()) { + Ok(handler) => handler, + Err(err) => { + logging::log_tcp_closed_conn_with_error!(conn_peer_addr, conn_label, err); return; } }; + // Submit execution in a separate task. tokio::spawn(async move { - if let Err(err) = connection_handler.handle().await { + if let Err(err) = connection_handler.handle(conn).await { logging::log_tcp_closed_conn_with_error!(conn_peer_addr, conn_label, err); } else { logging::log_tcp_closed_conn!(conn_peer_addr, conn_label); diff --git a/tests/integration.rs b/tests/integration.rs index ead73f5..4205258 100644 --- a/tests/integration.rs +++ b/tests/integration.rs @@ -10,7 +10,7 @@ use std::{net::SocketAddr, thread::sleep, time::Duration}; mod common; #[tokio::test] -async fn http_server_single_client() { +async fn socks5_proxy_single_client() { common::init_logging(); let lurk_server_addr = "127.0.0.1:32001".parse::().unwrap(); @@ -48,7 +48,7 @@ async fn http_server_single_client() { } #[tokio::test] -async fn echo_server_multiple_clients() { +async fn socks5_proxy_multiple_clients() { common::init_logging(); let num_clients = 100; @@ -83,7 +83,7 @@ async fn echo_server_multiple_clients() { } #[tokio::test] -async fn http_healthcheck() { +async fn http_endpoint_healthcheck() { common::init_logging(); let http_endpoint_addr = "127.0.0.1:32005".parse::().unwrap();