Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion apps/pyth-lazer-agent/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pyth-lazer-agent"
version = "0.6.1"
version = "0.7.0"
edition = "2024"
description = "Pyth Lazer Agent"
license = "Apache-2.0"
Expand Down Expand Up @@ -32,6 +32,7 @@ serde_json = "1.0.140"
soketto = { version = "0.8.1", features = ["http"] }
solana-keypair = "2.2.1"
tokio = { version = "1.44.1", features = ["full"] }
tokio-native-tls = "0.3.1"
tokio-tungstenite = { version = "0.26.2", features = ["native-tls", "url"] }
tokio-util = { version = "0.7.14", features = ["compat"] }
tracing = "0.1.41"
Expand Down
4 changes: 4 additions & 0 deletions apps/pyth-lazer-agent/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ authorization_token = "your_token"
listen_address = "0.0.0.0:8910"
publish_interval_duration = "25ms"
enable_update_deduplication = false
# Optional proxy configuration
# proxy_url = "http://proxy.example.com:8080"
# proxy_url = "http://username:[email protected]:8080" # With authentication
```

- `relayers_urls`: The Lazer team will provide these.
Expand All @@ -58,3 +61,4 @@ enable_update_deduplication = false
- `listen_address`: The local port the agent will be listening on; can be anything you want.
- `publisher_interval`: The agent will batch and send transaction bundles at this interval. The Lazer team will provide guidance here.
- `enable_update_deduplication`: The agent will deduplicate updates based inside each batch before sending it to Lazer.
- `proxy_url` (optional): HTTP/HTTPS proxy URL for WebSocket connections. Supports Basic authentication via URL credentials (e.g., `http://user:pass@proxy:port`).
1 change: 1 addition & 0 deletions apps/pyth-lazer-agent/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ pub struct Config {
pub enable_update_deduplication: bool,
#[serde(with = "humantime_serde", default = "default_update_deduplication_ttl")]
pub update_deduplication_ttl: Duration,
pub proxy_url: Option<Url>,
}

#[derive(Deserialize, Derivative, Clone, PartialEq)]
Expand Down
1 change: 1 addition & 0 deletions apps/pyth-lazer-agent/src/jrpc_handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,7 @@ pub mod tests {
history_service_url: None,
enable_update_deduplication: false,
update_deduplication_ttl: Default::default(),
proxy_url: None,
};

println!("{:?}", get_metadata(config).await.unwrap());
Expand Down
2 changes: 2 additions & 0 deletions apps/pyth-lazer-agent/src/lazer_publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ impl LazerPublisher {
token: authorization_token.clone(),
receiver: relayer_sender.subscribe(),
is_ready: is_ready.clone(),
proxy_url: config.proxy_url.clone(),
};
tokio::spawn(async move { task.run().await });
}
Expand Down Expand Up @@ -301,6 +302,7 @@ mod tests {
history_service_url: None,
enable_update_deduplication: false,
update_deduplication_ttl: Default::default(),
proxy_url: None,
};

let (relayer_sender, mut relayer_receiver) = broadcast::channel(CHANNEL_CAPACITY);
Expand Down
139 changes: 127 additions & 12 deletions apps/pyth-lazer-agent/src/relayer_session.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use anyhow::{Result, bail};
use anyhow::{Context, Result, bail};
use backoff::ExponentialBackoffBuilder;
use backoff::backoff::Backoff;
use base64::Engine;
use futures_util::stream::{SplitSink, SplitStream};
use futures_util::{SinkExt, StreamExt};
use http::HeaderValue;
Expand All @@ -9,32 +10,148 @@ use pyth_lazer_publisher_sdk::transaction::SignedLazerTransaction;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::{Duration, Instant};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
use tokio::select;
use tokio::sync::broadcast;
use tokio_tungstenite::tungstenite::client::IntoClientRequest;
use tokio_tungstenite::{
MaybeTlsStream, WebSocketStream, connect_async_with_config,
MaybeTlsStream, WebSocketStream, client_async, connect_async_with_config,
tungstenite::Message as TungsteniteMessage,
};
use url::Url;

type RelayerWsSender = SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, TungsteniteMessage>;
type RelayerWsReceiver = SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>;

async fn connect_to_relayer(url: Url, token: &str) -> Result<(RelayerWsSender, RelayerWsReceiver)> {
tracing::info!("connecting to the relayer at {}", url);
let mut req = url.clone().into_client_request()?;
async fn connect_through_proxy(
proxy_url: &Url,
target_url: &Url,
token: &str,
) -> Result<(RelayerWsSender, RelayerWsReceiver)> {
tracing::info!(
"connecting to the relayer at {} via proxy {}",
target_url,
proxy_url
);

let proxy_host = proxy_url.host_str().context("Proxy URL must have a host")?;
let proxy_port = proxy_url
.port()
.unwrap_or(if proxy_url.scheme() == "https" {
443
} else {
80
});

let proxy_addr = format!("{proxy_host}:{proxy_port}");
let mut stream = TcpStream::connect(&proxy_addr)
.await
.context(format!("Failed to connect to proxy at {proxy_addr}"))?;

let target_host = target_url
.host_str()
.context("Target URL must have a host")?;
let target_port = target_url
.port()
.unwrap_or(if target_url.scheme() == "wss" {
443
} else {
80
});

let mut connect_request = format!(
"CONNECT {target_host}:{target_port} HTTP/1.1\r\nHost: {target_host}:{target_port}\r\n"
);

let username = proxy_url.username();
if !username.is_empty() {
let password = proxy_url.password().unwrap_or("");
let credentials = format!("{username}:{password}");
let encoded = base64::engine::general_purpose::STANDARD.encode(credentials.as_bytes());
connect_request = format!("{connect_request}Proxy-Authorization: Basic {encoded}\r\n");
}

connect_request = format!("{connect_request}\r\n");

stream
.write_all(connect_request.as_bytes())
.await
.context("Failed to send CONNECT request to proxy")?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you note the proxy url here?


let mut response = vec![0u8; 1024];
let n = stream
.read(&mut response)
.await
.context("Failed to read CONNECT response from proxy")?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you note the proxy url here?


let response_str =
String::from_utf8_lossy(response.get(..n).context("Invalid response slice range")?);

if !response_str.starts_with("HTTP/1.1 200") && !response_str.starts_with("HTTP/1.0 200") {
bail!(
"Proxy CONNECT failed: {}",
response_str.lines().next().unwrap_or("Unknown error")
);
}

tracing::info!("Successfully connected through proxy");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you note the proxy url here?


let mut req = target_url.clone().into_client_request()?;
let headers = req.headers_mut();
headers.insert(
"Authorization",
HeaderValue::from_str(&format!("Bearer {token}"))?,
);
let (ws_stream, _) = connect_async_with_config(req, None, true).await?;
tracing::info!("connected to the relayer at {}", url);

let maybe_tls_stream = if target_url.scheme() == "wss" {
let tls_connector = tokio_native_tls::native_tls::TlsConnector::builder()
.build()
.context("Failed to build TLS connector")?;
let tokio_connector = tokio_native_tls::TlsConnector::from(tls_connector);
let domain = target_host;
let tls_stream = tokio_connector
.connect(domain, stream)
.await
.context("Failed to establish TLS connection")?;

MaybeTlsStream::NativeTls(tls_stream)
} else {
MaybeTlsStream::Plain(stream)
};

let (ws_stream, _) = client_async(req, maybe_tls_stream)
.await
.context("Failed to complete WebSocket handshake")?;

tracing::info!(
"WebSocket connection established to relayer at {}",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you note that this is through a proxy and the url?

target_url
);
Ok(ws_stream.split())
}

async fn connect_to_relayer(
url: Url,
token: &str,
proxy_url: Option<&Url>,
) -> Result<(RelayerWsSender, RelayerWsReceiver)> {
if let Some(proxy) = proxy_url {
connect_through_proxy(proxy, &url, token).await
} else {
tracing::info!("connecting to the relayer at {}", url);
let mut req = url.clone().into_client_request()?;
let headers = req.headers_mut();
headers.insert(
"Authorization",
HeaderValue::from_str(&format!("Bearer {token}"))?,
);
let (ws_stream, _) = connect_async_with_config(req, None, true).await?;
tracing::info!("connected to the relayer at {}", url);
Ok(ws_stream.split())
}
}

struct RelayerWsSession {
ws_sender: RelayerWsSender,
}
Expand All @@ -58,11 +175,11 @@ impl RelayerWsSession {
}

pub struct RelayerSessionTask {
// connection state
pub url: Url,
pub token: String,
pub receiver: broadcast::Receiver<SignedLazerTransaction>,
pub is_ready: Arc<AtomicBool>,
pub proxy_url: Option<Url>,
}

impl RelayerSessionTask {
Expand Down Expand Up @@ -108,10 +225,8 @@ impl RelayerSessionTask {
}

pub async fn run_relayer_connection(&mut self) -> Result<()> {
// Establish relayer connection
// Relayer will drop the connection if no data received in 5s
let (relayer_ws_sender, mut relayer_ws_receiver) =
connect_to_relayer(self.url.clone(), &self.token).await?;
connect_to_relayer(self.url.clone(), &self.token, self.proxy_url.as_ref()).await?;
let mut relayer_ws_session = RelayerWsSession {
ws_sender: relayer_ws_sender,
};
Expand Down Expand Up @@ -236,11 +351,11 @@ mod tests {
let (relayer_sender, relayer_receiver) = broadcast::channel(RELAYER_CHANNEL_CAPACITY);

let mut relayer_session_task = RelayerSessionTask {
// connection state
url: Url::parse("ws://127.0.0.1:12346").unwrap(),
token: "token1".to_string(),
receiver: relayer_receiver,
is_ready: Arc::new(AtomicBool::new(false)),
proxy_url: None,
};
tokio::spawn(async move { relayer_session_task.run().await });
tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
Expand Down