-
Notifications
You must be signed in to change notification settings - Fork 301
feat(pyth-lazer-agent): add HTTP proxy support for WebSocket connections #3142
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 3 commits
6ffb5fb
7c4415c
90cdadf
6cb8111
00a4442
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -50,6 +50,9 @@ authorization_token = "your_token" | |
listen_address = "0.0.0.0:8910" | ||
publish_interval_duration = "25ms" | ||
enable_update_deduplication = false | ||
# Optional proxy configuration | ||
# proxy_url = "http://proxy.example.com:8080" | ||
# proxy_url = "http://username:[email protected]:8080" # With authentication | ||
``` | ||
|
||
- `relayers_urls`: The Lazer team will provide these. | ||
|
@@ -58,3 +61,4 @@ enable_update_deduplication = false | |
- `listen_address`: The local port the agent will be listening on; can be anything you want. | ||
- `publisher_interval`: The agent will batch and send transaction bundles at this interval. The Lazer team will provide guidance here. | ||
- `enable_update_deduplication`: The agent will deduplicate updates based inside each batch before sending it to Lazer. | ||
- `proxy_url` (optional): HTTP/HTTPS proxy URL for WebSocket connections. Supports Basic authentication via URL credentials (e.g., `http://user:pass@proxy:port`). |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,7 @@ | ||
use anyhow::{Result, bail}; | ||
use anyhow::{Context, Result, bail}; | ||
use backoff::ExponentialBackoffBuilder; | ||
use backoff::backoff::Backoff; | ||
use base64::Engine; | ||
use futures_util::stream::{SplitSink, SplitStream}; | ||
use futures_util::{SinkExt, StreamExt}; | ||
use http::HeaderValue; | ||
|
@@ -9,32 +10,148 @@ use pyth_lazer_publisher_sdk::transaction::SignedLazerTransaction; | |
use std::sync::Arc; | ||
use std::sync::atomic::{AtomicBool, Ordering}; | ||
use std::time::{Duration, Instant}; | ||
use tokio::io::{AsyncReadExt, AsyncWriteExt}; | ||
use tokio::net::TcpStream; | ||
use tokio::select; | ||
use tokio::sync::broadcast; | ||
use tokio_tungstenite::tungstenite::client::IntoClientRequest; | ||
use tokio_tungstenite::{ | ||
MaybeTlsStream, WebSocketStream, connect_async_with_config, | ||
MaybeTlsStream, WebSocketStream, client_async, connect_async_with_config, | ||
tungstenite::Message as TungsteniteMessage, | ||
}; | ||
use url::Url; | ||
|
||
type RelayerWsSender = SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, TungsteniteMessage>; | ||
type RelayerWsReceiver = SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>; | ||
|
||
async fn connect_to_relayer(url: Url, token: &str) -> Result<(RelayerWsSender, RelayerWsReceiver)> { | ||
tracing::info!("connecting to the relayer at {}", url); | ||
let mut req = url.clone().into_client_request()?; | ||
async fn connect_through_proxy( | ||
proxy_url: &Url, | ||
target_url: &Url, | ||
token: &str, | ||
) -> Result<(RelayerWsSender, RelayerWsReceiver)> { | ||
tracing::info!( | ||
"connecting to the relayer at {} via proxy {}", | ||
target_url, | ||
proxy_url | ||
); | ||
|
||
let proxy_host = proxy_url.host_str().context("Proxy URL must have a host")?; | ||
let proxy_port = proxy_url | ||
.port() | ||
.unwrap_or(if proxy_url.scheme() == "https" { | ||
443 | ||
} else { | ||
80 | ||
}); | ||
|
||
let proxy_addr = format!("{proxy_host}:{proxy_port}"); | ||
let mut stream = TcpStream::connect(&proxy_addr) | ||
.await | ||
.context(format!("Failed to connect to proxy at {proxy_addr}"))?; | ||
|
||
let target_host = target_url | ||
.host_str() | ||
.context("Target URL must have a host")?; | ||
let target_port = target_url | ||
.port() | ||
.unwrap_or(if target_url.scheme() == "wss" { | ||
443 | ||
} else { | ||
80 | ||
}); | ||
|
||
let mut connect_request = format!( | ||
"CONNECT {target_host}:{target_port} HTTP/1.1\r\nHost: {target_host}:{target_port}\r\n" | ||
); | ||
|
||
let username = proxy_url.username(); | ||
if !username.is_empty() { | ||
let password = proxy_url.password().unwrap_or(""); | ||
let credentials = format!("{username}:{password}"); | ||
let encoded = base64::engine::general_purpose::STANDARD.encode(credentials.as_bytes()); | ||
connect_request = format!("{connect_request}Proxy-Authorization: Basic {encoded}\r\n"); | ||
} | ||
|
||
connect_request = format!("{connect_request}\r\n"); | ||
|
||
stream | ||
.write_all(connect_request.as_bytes()) | ||
.await | ||
.context("Failed to send CONNECT request to proxy")?; | ||
|
||
let mut response = vec![0u8; 1024]; | ||
let n = stream | ||
.read(&mut response) | ||
.await | ||
.context("Failed to read CONNECT response from proxy")?; | ||
|
||
|
||
let response_str = | ||
String::from_utf8_lossy(response.get(..n).context("Invalid response slice range")?); | ||
|
||
if !response_str.starts_with("HTTP/1.1 200") && !response_str.starts_with("HTTP/1.0 200") { | ||
bail!( | ||
"Proxy CONNECT failed: {}", | ||
response_str.lines().next().unwrap_or("Unknown error") | ||
); | ||
} | ||
|
||
tracing::info!("Successfully connected through proxy"); | ||
|
||
|
||
let mut req = target_url.clone().into_client_request()?; | ||
let headers = req.headers_mut(); | ||
headers.insert( | ||
"Authorization", | ||
HeaderValue::from_str(&format!("Bearer {token}"))?, | ||
); | ||
let (ws_stream, _) = connect_async_with_config(req, None, true).await?; | ||
tracing::info!("connected to the relayer at {}", url); | ||
|
||
let maybe_tls_stream = if target_url.scheme() == "wss" { | ||
let tls_connector = tokio_native_tls::native_tls::TlsConnector::builder() | ||
.build() | ||
.context("Failed to build TLS connector")?; | ||
let tokio_connector = tokio_native_tls::TlsConnector::from(tls_connector); | ||
let domain = target_host; | ||
let tls_stream = tokio_connector | ||
.connect(domain, stream) | ||
.await | ||
.context("Failed to establish TLS connection")?; | ||
|
||
MaybeTlsStream::NativeTls(tls_stream) | ||
} else { | ||
MaybeTlsStream::Plain(stream) | ||
}; | ||
|
||
let (ws_stream, _) = client_async(req, maybe_tls_stream) | ||
.await | ||
.context("Failed to complete WebSocket handshake")?; | ||
|
||
tracing::info!( | ||
"WebSocket connection established to relayer at {}", | ||
|
||
target_url | ||
); | ||
Ok(ws_stream.split()) | ||
} | ||
|
||
async fn connect_to_relayer( | ||
url: Url, | ||
token: &str, | ||
proxy_url: Option<&Url>, | ||
) -> Result<(RelayerWsSender, RelayerWsReceiver)> { | ||
if let Some(proxy) = proxy_url { | ||
connect_through_proxy(proxy, &url, token).await | ||
} else { | ||
tracing::info!("connecting to the relayer at {}", url); | ||
let mut req = url.clone().into_client_request()?; | ||
let headers = req.headers_mut(); | ||
headers.insert( | ||
"Authorization", | ||
HeaderValue::from_str(&format!("Bearer {token}"))?, | ||
); | ||
let (ws_stream, _) = connect_async_with_config(req, None, true).await?; | ||
tracing::info!("connected to the relayer at {}", url); | ||
Ok(ws_stream.split()) | ||
} | ||
} | ||
|
||
struct RelayerWsSession { | ||
ws_sender: RelayerWsSender, | ||
} | ||
|
@@ -58,11 +175,11 @@ impl RelayerWsSession { | |
} | ||
|
||
pub struct RelayerSessionTask { | ||
// connection state | ||
pub url: Url, | ||
pub token: String, | ||
pub receiver: broadcast::Receiver<SignedLazerTransaction>, | ||
pub is_ready: Arc<AtomicBool>, | ||
pub proxy_url: Option<Url>, | ||
} | ||
|
||
impl RelayerSessionTask { | ||
|
@@ -108,10 +225,8 @@ impl RelayerSessionTask { | |
} | ||
|
||
pub async fn run_relayer_connection(&mut self) -> Result<()> { | ||
// Establish relayer connection | ||
// Relayer will drop the connection if no data received in 5s | ||
let (relayer_ws_sender, mut relayer_ws_receiver) = | ||
connect_to_relayer(self.url.clone(), &self.token).await?; | ||
connect_to_relayer(self.url.clone(), &self.token, self.proxy_url.as_ref()).await?; | ||
let mut relayer_ws_session = RelayerWsSession { | ||
ws_sender: relayer_ws_sender, | ||
}; | ||
|
@@ -236,11 +351,11 @@ mod tests { | |
let (relayer_sender, relayer_receiver) = broadcast::channel(RELAYER_CHANNEL_CAPACITY); | ||
|
||
let mut relayer_session_task = RelayerSessionTask { | ||
// connection state | ||
url: Url::parse("ws://127.0.0.1:12346").unwrap(), | ||
token: "token1".to_string(), | ||
receiver: relayer_receiver, | ||
is_ready: Arc::new(AtomicBool::new(false)), | ||
proxy_url: None, | ||
}; | ||
tokio::spawn(async move { relayer_session_task.run().await }); | ||
tokio::time::sleep(std::time::Duration::from_millis(1000)).await; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you note the proxy url here?