Skip to content

Commit

Permalink
Add HTTP headers feature for PubSub Client
Browse files Browse the repository at this point in the history
Signed-off-by: Ozan Selte <[email protected]>
  • Loading branch information
ozanselte committed Dec 4, 2024
1 parent 3f76d0a commit dcd2346
Showing 1 changed file with 54 additions and 14 deletions.
68 changes: 54 additions & 14 deletions pubsub-client/src/pubsub_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ pub use crate::nonblocking::pubsub_client::PubsubClientError;
use {
crossbeam_channel::{unbounded, Receiver, Sender},
log::*,
reqwest::header,
serde::de::DeserializeOwned,
serde_json::{
json,
Expand Down Expand Up @@ -121,7 +122,10 @@ use {
thread::{sleep, JoinHandle},
time::Duration,
},
tungstenite::{connect, stream::MaybeTlsStream, Message, WebSocket},
tungstenite::{
client::IntoClientRequest, connect, handshake::client::Request, stream::MaybeTlsStream,
Message, WebSocket,
},
url::Url,
};

Expand Down Expand Up @@ -298,12 +302,33 @@ pub type RootSubscription = (PubsubRootClientSubscription, Receiver<Slot>);
/// See the [module documentation][self].
pub struct PubsubClient {}

fn create_request_with_headers(
url: &Url,
headers: &Option<header::HeaderMap>,
) -> Result<Request, tungstenite::Error> {
let mut result = url.into_client_request();
if let Ok(ref mut request) = result {
if let Some(header_map) = headers {
let headers_reference = request.headers_mut();
for (key, value) in header_map.iter() {
headers_reference.insert(key, value.clone());
}
}
}
result
}

fn connect_with_retry(
url: Url,
headers: Option<header::HeaderMap>,
) -> Result<WebSocket<MaybeTlsStream<TcpStream>>, tungstenite::Error> {
let mut connection_retries = 5;
loop {
let result = connect(url.clone()).map(|(socket, _)| socket);
let request = create_request_with_headers(&url, &headers);
if let Err(err) = request {
return Err(err);
}
let result = connect(request?).map(|(socket, _)| socket);
if let Err(tungstenite::Error::Http(response)) = &result {
if response.status() == reqwest::StatusCode::TOO_MANY_REQUESTS && connection_retries > 0
{
Expand Down Expand Up @@ -346,9 +371,10 @@ impl PubsubClient {
url: &str,
pubkey: &Pubkey,
config: Option<RpcAccountInfoConfig>,
headers: Option<header::HeaderMap>,
) -> Result<AccountSubscription, PubsubClientError> {
let url = Url::parse(url)?;
let socket = connect_with_retry(url)?;
let socket = connect_with_retry(url, headers)?;
let (sender, receiver) = unbounded();

let socket = Arc::new(RwLock::new(socket));
Expand Down Expand Up @@ -399,9 +425,10 @@ impl PubsubClient {
url: &str,
filter: RpcBlockSubscribeFilter,
config: Option<RpcBlockSubscribeConfig>,
headers: Option<header::HeaderMap>,
) -> Result<BlockSubscription, PubsubClientError> {
let url = Url::parse(url)?;
let socket = connect_with_retry(url)?;
let socket = connect_with_retry(url, headers)?;
let (sender, receiver) = unbounded();

let socket = Arc::new(RwLock::new(socket));
Expand Down Expand Up @@ -447,9 +474,10 @@ impl PubsubClient {
url: &str,
filter: RpcTransactionLogsFilter,
config: RpcTransactionLogsConfig,
headers: Option<header::HeaderMap>,
) -> Result<LogsSubscription, PubsubClientError> {
let url = Url::parse(url)?;
let socket = connect_with_retry(url)?;
let socket = connect_with_retry(url, headers)?;
let (sender, receiver) = unbounded();

let socket = Arc::new(RwLock::new(socket));
Expand Down Expand Up @@ -496,9 +524,10 @@ impl PubsubClient {
url: &str,
pubkey: &Pubkey,
config: Option<RpcProgramAccountsConfig>,
headers: Option<header::HeaderMap>,
) -> Result<ProgramSubscription, PubsubClientError> {
let url = Url::parse(url)?;
let socket = connect_with_retry(url)?;
let socket = connect_with_retry(url, headers)?;
let (sender, receiver) = unbounded();

let socket = Arc::new(RwLock::new(socket));
Expand Down Expand Up @@ -547,9 +576,12 @@ impl PubsubClient {
/// This method corresponds directly to the [`voteSubscribe`] RPC method.
///
/// [`voteSubscribe`]: https://solana.com/docs/rpc/websocket/votesubscribe
pub fn vote_subscribe(url: &str) -> Result<VoteSubscription, PubsubClientError> {
pub fn vote_subscribe(
url: &str,
headers: Option<header::HeaderMap>,
) -> Result<VoteSubscription, PubsubClientError> {
let url = Url::parse(url)?;
let socket = connect_with_retry(url)?;
let socket = connect_with_retry(url, headers)?;
let (sender, receiver) = unbounded();

let socket = Arc::new(RwLock::new(socket));
Expand Down Expand Up @@ -592,9 +624,12 @@ impl PubsubClient {
/// This method corresponds directly to the [`rootSubscribe`] RPC method.
///
/// [`rootSubscribe`]: https://solana.com/docs/rpc/websocket/rootsubscribe
pub fn root_subscribe(url: &str) -> Result<RootSubscription, PubsubClientError> {
pub fn root_subscribe(
url: &str,
headers: Option<header::HeaderMap>,
) -> Result<RootSubscription, PubsubClientError> {
let url = Url::parse(url)?;
let socket = connect_with_retry(url)?;
let socket = connect_with_retry(url, headers)?;
let (sender, receiver) = unbounded();

let socket = Arc::new(RwLock::new(socket));
Expand Down Expand Up @@ -642,9 +677,10 @@ impl PubsubClient {
url: &str,
signature: &Signature,
config: Option<RpcSignatureSubscribeConfig>,
headers: Option<header::HeaderMap>,
) -> Result<SignatureSubscription, PubsubClientError> {
let url = Url::parse(url)?;
let socket = connect_with_retry(url)?;
let socket = connect_with_retry(url, headers)?;
let (sender, receiver) = unbounded();

let socket = Arc::new(RwLock::new(socket));
Expand Down Expand Up @@ -689,9 +725,12 @@ impl PubsubClient {
/// This method corresponds directly to the [`slotSubscribe`] RPC method.
///
/// [`slotSubscribe`]: https://solana.com/docs/rpc/websocket/slotsubscribe
pub fn slot_subscribe(url: &str) -> Result<SlotsSubscription, PubsubClientError> {
pub fn slot_subscribe(
url: &str,
headers: Option<header::HeaderMap>,
) -> Result<SlotsSubscription, PubsubClientError> {
let url = Url::parse(url)?;
let socket = connect_with_retry(url)?;
let socket = connect_with_retry(url, headers)?;
let (sender, receiver) = unbounded::<SlotInfo>();

let socket = Arc::new(RwLock::new(socket));
Expand Down Expand Up @@ -740,9 +779,10 @@ impl PubsubClient {
pub fn slot_updates_subscribe(
url: &str,
handler: impl Fn(SlotUpdate) + Send + 'static,
headers: Option<header::HeaderMap>,
) -> Result<PubsubClientSubscription<SlotUpdate>, PubsubClientError> {
let url = Url::parse(url)?;
let socket = connect_with_retry(url)?;
let socket = connect_with_retry(url, headers)?;

let socket = Arc::new(RwLock::new(socket));
let socket_clone = socket.clone();
Expand Down

0 comments on commit dcd2346

Please sign in to comment.