diff --git a/Cargo.lock b/Cargo.lock index 48d4df2651..f10e7b87e4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5656,7 +5656,7 @@ dependencies = [ [[package]] name = "pyth-lazer-agent" -version = "0.7.1" +version = "0.7.2" dependencies = [ "anyhow", "backoff", @@ -5683,6 +5683,7 @@ dependencies = [ "solana-keypair", "tempfile", "tokio", + "tokio-native-tls", "tokio-tungstenite 0.26.2", "tokio-util", "tracing", diff --git a/apps/pyth-lazer-agent/Cargo.toml b/apps/pyth-lazer-agent/Cargo.toml index e00357a4fa..550db37e44 100644 --- a/apps/pyth-lazer-agent/Cargo.toml +++ b/apps/pyth-lazer-agent/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "pyth-lazer-agent" -version = "0.7.1" +version = "0.7.2" edition = "2024" description = "Pyth Lazer Agent" license = "Apache-2.0" @@ -32,6 +32,7 @@ serde_json = "1.0.140" soketto = { version = "0.8.1", features = ["http"] } solana-keypair = "2.2.1" tokio = { version = "1.44.1", features = ["full"] } +tokio-native-tls = "0.3.1" tokio-tungstenite = { version = "0.26.2", features = ["native-tls", "url"] } tokio-util = { version = "0.7.14", features = ["compat"] } tracing = "0.1.41" diff --git a/apps/pyth-lazer-agent/README.md b/apps/pyth-lazer-agent/README.md index b067cf8112..dac27a3401 100644 --- a/apps/pyth-lazer-agent/README.md +++ b/apps/pyth-lazer-agent/README.md @@ -50,6 +50,9 @@ authorization_token = "your_token" listen_address = "0.0.0.0:8910" publish_interval_duration = "25ms" enable_update_deduplication = false +# Optional proxy configuration +# proxy_url = "http://proxy.example.com:8080" +# proxy_url = "http://username:password@proxy.example.com:8080" # With authentication ``` - `relayers_urls`: The Lazer team will provide these. @@ -58,3 +61,4 @@ enable_update_deduplication = false - `listen_address`: The local port the agent will be listening on; can be anything you want. - `publisher_interval`: The agent will batch and send transaction bundles at this interval. The Lazer team will provide guidance here. - `enable_update_deduplication`: The agent will deduplicate updates based inside each batch before sending it to Lazer. +- `proxy_url` (optional): HTTP/HTTPS proxy URL for WebSocket connections. Supports Basic authentication via URL credentials (e.g., `http://user:pass@proxy:port`). diff --git a/apps/pyth-lazer-agent/src/config.rs b/apps/pyth-lazer-agent/src/config.rs index 5012a45647..f9929e16da 100644 --- a/apps/pyth-lazer-agent/src/config.rs +++ b/apps/pyth-lazer-agent/src/config.rs @@ -23,6 +23,7 @@ pub struct Config { pub enable_update_deduplication: bool, #[serde(with = "humantime_serde", default = "default_update_deduplication_ttl")] pub update_deduplication_ttl: Duration, + pub proxy_url: Option, } #[derive(Deserialize, Derivative, Clone, PartialEq)] diff --git a/apps/pyth-lazer-agent/src/jrpc_handle.rs b/apps/pyth-lazer-agent/src/jrpc_handle.rs index 3f493d93da..3cba8048af 100644 --- a/apps/pyth-lazer-agent/src/jrpc_handle.rs +++ b/apps/pyth-lazer-agent/src/jrpc_handle.rs @@ -313,6 +313,7 @@ pub mod tests { history_service_url: None, enable_update_deduplication: false, update_deduplication_ttl: Default::default(), + proxy_url: None, }; println!("{:?}", get_metadata(config).await.unwrap()); diff --git a/apps/pyth-lazer-agent/src/lazer_publisher.rs b/apps/pyth-lazer-agent/src/lazer_publisher.rs index 58d15e00b1..53ffb2a22c 100644 --- a/apps/pyth-lazer-agent/src/lazer_publisher.rs +++ b/apps/pyth-lazer-agent/src/lazer_publisher.rs @@ -80,6 +80,7 @@ impl LazerPublisher { token: authorization_token.clone(), receiver: relayer_sender.subscribe(), is_ready: is_ready.clone(), + proxy_url: config.proxy_url.clone(), }; tokio::spawn(async move { task.run().await }); } @@ -301,6 +302,7 @@ mod tests { history_service_url: None, enable_update_deduplication: false, update_deduplication_ttl: Default::default(), + proxy_url: None, }; let (relayer_sender, mut relayer_receiver) = broadcast::channel(CHANNEL_CAPACITY); diff --git a/apps/pyth-lazer-agent/src/relayer_session.rs b/apps/pyth-lazer-agent/src/relayer_session.rs index 8a26e13fb2..97bef61504 100644 --- a/apps/pyth-lazer-agent/src/relayer_session.rs +++ b/apps/pyth-lazer-agent/src/relayer_session.rs @@ -1,6 +1,7 @@ -use anyhow::{Result, bail}; +use anyhow::{Context, Result, bail}; use backoff::ExponentialBackoffBuilder; use backoff::backoff::Backoff; +use base64::Engine; use futures_util::stream::{SplitSink, SplitStream}; use futures_util::{SinkExt, StreamExt}; use http::HeaderValue; @@ -9,12 +10,13 @@ use pyth_lazer_publisher_sdk::transaction::SignedLazerTransaction; use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; use std::time::{Duration, Instant}; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::net::TcpStream; use tokio::select; use tokio::sync::broadcast; use tokio_tungstenite::tungstenite::client::IntoClientRequest; use tokio_tungstenite::{ - MaybeTlsStream, WebSocketStream, connect_async_with_config, + MaybeTlsStream, WebSocketStream, client_async, connect_async_with_config, tungstenite::Message as TungsteniteMessage, }; use url::Url; @@ -22,19 +24,136 @@ use url::Url; type RelayerWsSender = SplitSink>, TungsteniteMessage>; type RelayerWsReceiver = SplitStream>>; -async fn connect_to_relayer(url: Url, token: &str) -> Result<(RelayerWsSender, RelayerWsReceiver)> { - tracing::info!("connecting to the relayer at {}", url); - let mut req = url.clone().into_client_request()?; +async fn connect_through_proxy( + proxy_url: &Url, + target_url: &Url, + token: &str, +) -> Result<(RelayerWsSender, RelayerWsReceiver)> { + tracing::info!( + "connecting to the relayer at {} via proxy {}", + target_url, + proxy_url + ); + + let proxy_host = proxy_url.host_str().context("Proxy URL must have a host")?; + let proxy_port = proxy_url + .port() + .unwrap_or(if proxy_url.scheme() == "https" { + 443 + } else { + 80 + }); + + let proxy_addr = format!("{proxy_host}:{proxy_port}"); + let mut stream = TcpStream::connect(&proxy_addr) + .await + .context(format!("Failed to connect to proxy at {proxy_addr}"))?; + + let target_host = target_url + .host_str() + .context("Target URL must have a host")?; + let target_port = target_url + .port() + .unwrap_or(if target_url.scheme() == "wss" { + 443 + } else { + 80 + }); + + let mut connect_request = format!( + "CONNECT {target_host}:{target_port} HTTP/1.1\r\nHost: {target_host}:{target_port}\r\n" + ); + + let username = proxy_url.username(); + if !username.is_empty() { + let password = proxy_url.password().unwrap_or(""); + let credentials = format!("{username}:{password}"); + let encoded = base64::engine::general_purpose::STANDARD.encode(credentials.as_bytes()); + connect_request = format!("{connect_request}Proxy-Authorization: Basic {encoded}\r\n"); + } + + connect_request = format!("{connect_request}\r\n"); + + stream + .write_all(connect_request.as_bytes()) + .await + .context(format!( + "Failed to send CONNECT request to proxy at {proxy_url}" + ))?; + + let mut response = vec![0u8; 1024]; + let n = stream.read(&mut response).await.context(format!( + "Failed to read CONNECT response from proxy at {proxy_url}" + ))?; + + let response_str = + String::from_utf8_lossy(response.get(..n).context("Invalid response slice range")?); + + if !response_str.starts_with("HTTP/1.1 200") && !response_str.starts_with("HTTP/1.0 200") { + bail!( + "Proxy CONNECT failed: {}", + response_str.lines().next().unwrap_or("Unknown error") + ); + } + + tracing::info!("Successfully connected through proxy at {}", proxy_url); + + let mut req = target_url.clone().into_client_request()?; let headers = req.headers_mut(); headers.insert( "Authorization", HeaderValue::from_str(&format!("Bearer {token}"))?, ); - let (ws_stream, _) = connect_async_with_config(req, None, true).await?; - tracing::info!("connected to the relayer at {}", url); + + let maybe_tls_stream = if target_url.scheme() == "wss" { + let tls_connector = tokio_native_tls::native_tls::TlsConnector::builder() + .build() + .context("Failed to build TLS connector")?; + let tokio_connector = tokio_native_tls::TlsConnector::from(tls_connector); + let domain = target_host; + let tls_stream = tokio_connector + .connect(domain, stream) + .await + .context("Failed to establish TLS connection")?; + + MaybeTlsStream::NativeTls(tls_stream) + } else { + MaybeTlsStream::Plain(stream) + }; + + let (ws_stream, _) = client_async(req, maybe_tls_stream) + .await + .context("Failed to complete WebSocket handshake")?; + + tracing::info!( + "WebSocket connection established to relayer at {} via proxy {}", + target_url, + proxy_url + ); Ok(ws_stream.split()) } +async fn connect_to_relayer( + url: Url, + token: &str, + proxy_url: Option<&Url>, +) -> Result<(RelayerWsSender, RelayerWsReceiver)> { + if let Some(proxy) = proxy_url { + connect_through_proxy(proxy, &url, token).await + } else { + tracing::info!("connecting to the relayer at {}", url); + let mut req = url.clone().into_client_request()?; + let headers = req.headers_mut(); + headers.insert( + "Authorization", + HeaderValue::from_str(&format!("Bearer {token}"))?, + ); + let (ws_stream, _) = connect_async_with_config(req, None, true).await?; + tracing::info!("connected to the relayer at {}", url); + Ok(ws_stream.split()) + } +} + struct RelayerWsSession { ws_sender: RelayerWsSender, } @@ -58,11 +177,11 @@ impl RelayerWsSession { } pub struct RelayerSessionTask { - // connection state pub url: Url, pub token: String, pub receiver: broadcast::Receiver, pub is_ready: Arc, + pub proxy_url: Option, } impl RelayerSessionTask { @@ -108,10 +227,8 @@ impl RelayerSessionTask { } pub async fn run_relayer_connection(&mut self) -> Result<()> { - // Establish relayer connection - // Relayer will drop the connection if no data received in 5s let (relayer_ws_sender, mut relayer_ws_receiver) = - connect_to_relayer(self.url.clone(), &self.token).await?; + connect_to_relayer(self.url.clone(), &self.token, self.proxy_url.as_ref()).await?; let mut relayer_ws_session = RelayerWsSession { ws_sender: relayer_ws_sender, }; @@ -236,11 +353,11 @@ mod tests { let (relayer_sender, relayer_receiver) = broadcast::channel(RELAYER_CHANNEL_CAPACITY); let mut relayer_session_task = RelayerSessionTask { - // connection state url: Url::parse("ws://127.0.0.1:12346").unwrap(), token: "token1".to_string(), receiver: relayer_receiver, is_ready: Arc::new(AtomicBool::new(false)), + proxy_url: None, }; tokio::spawn(async move { relayer_session_task.run().await }); tokio::time::sleep(std::time::Duration::from_millis(1000)).await;