Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion rust/sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,4 @@ protoc-bin-vendored = "3.0.0"
default = []
# Arrow Flight is experimental/unsupported
arrow-flight = ["dep:arrow-flight", "dep:arrow-array", "dep:arrow-schema", "dep:arrow-ipc", "dep:futures"]
testing = []
testing = ["dep:futures"]
63 changes: 30 additions & 33 deletions rust/sdk/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ mod errors;
mod headers_provider;
mod landing_zone;
mod offset_generator;
#[cfg(feature = "testing")]
mod multiplexed_stream;
mod proxy;
mod record_types;
mod stream_configuration;
Expand Down Expand Up @@ -92,6 +94,9 @@ pub use record_types::{
ProtoBytes, ProtoEncodedRecord, ProtoMessage,
};
pub use stream_configuration::StreamConfigurationOptions;
#[cfg(feature = "testing")]
pub use multiplexed_stream::{MessageId, MultiplexedStream};

#[cfg(feature = "testing")]
pub use tls_config::NoTlsConfig;
pub use tls_config::{SecureTlsConfig, TlsConfig};
Expand Down Expand Up @@ -266,7 +271,6 @@ pub struct ZerobusSdk {
)]
pub use_tls: bool,
pub unity_catalog_url: String,
shared_channel: tokio::sync::Mutex<Option<ZerobusClient<Channel>>>,
workspace_id: String,
tls_config: Arc<dyn TlsConfig>,
}
Expand Down Expand Up @@ -347,7 +351,6 @@ impl ZerobusSdk {
use_tls: true,
unity_catalog_url,
workspace_id,
shared_channel: tokio::sync::Mutex::new(None),
tls_config: Arc::new(SecureTlsConfig::new()),
})
}
Expand All @@ -367,7 +370,6 @@ impl ZerobusSdk {
use_tls: true,
unity_catalog_url,
workspace_id,
shared_channel: tokio::sync::Mutex::new(None),
tls_config,
}
}
Expand Down Expand Up @@ -829,44 +831,34 @@ impl ZerobusSdk {
Ok(new_stream)
}

/// Gets or creates the shared Channel for all streams.
/// The first call creates the Channel, subsequent calls clone it.
/// All clones share the same underlying TCP connection via HTTP/2 multiplexing.
/// Creates a new Channel and TCP connection for each stream.
///
/// Each stream gets its own dedicated connection to avoid serialization
/// through tonic's tower Buffer worker, which processes requests one at
/// a time per channel.
async fn get_or_create_channel_zerobus_client(&self) -> ZerobusResult<ZerobusClient<Channel>> {
let mut guard = self.shared_channel.lock().await;
let endpoint = Endpoint::from_shared(self.zerobus_endpoint.clone())
.map_err(|err| ZerobusError::ChannelCreationError(err.to_string()))?;

if guard.is_none() {
// Create the channel for the first time.
let endpoint = Endpoint::from_shared(self.zerobus_endpoint.clone())
.map_err(|err| ZerobusError::ChannelCreationError(err.to_string()))?;
let endpoint = self.tls_config.configure_endpoint(endpoint)?;

let endpoint = self.tls_config.configure_endpoint(endpoint)?;
let host = endpoint.uri().host().unwrap_or_default().to_string();

// Check for HTTP proxy env vars (https_proxy, HTTPS_PROXY, etc.)
// and use a proxy connector if one is configured.
let host = endpoint.uri().host().unwrap_or_default().to_string();

let channel = if !proxy::is_no_proxy(&host) {
if let Some(proxy_connector) = proxy::create_proxy_connector() {
endpoint.connect_with_connector_lazy(proxy_connector)
} else {
endpoint.connect_lazy()
}
let channel = if !proxy::is_no_proxy(&host) {
if let Some(proxy_connector) = proxy::create_proxy_connector() {
endpoint.connect_with_connector_lazy(proxy_connector)
} else {
endpoint.connect_lazy()
};
}
} else {
endpoint.connect_lazy()
};

let client = ZerobusClient::new(channel)
.max_decoding_message_size(usize::MAX)
.max_encoding_message_size(usize::MAX);
let client = ZerobusClient::new(channel)
.max_decoding_message_size(usize::MAX)
.max_encoding_message_size(usize::MAX);

*guard = Some(client);
}

Ok(guard
.as_ref()
.expect("Channel was just initialized")
.clone())
Ok(client)
}
}

Expand Down Expand Up @@ -2234,6 +2226,11 @@ impl ZerobusStream {
.to_string(),
))
}

#[cfg(feature = "testing")]
pub(crate) fn has_capacity(&self) -> bool {
self.landing_zone.len() < self.options.max_inflight_requests
}
}

impl Drop for ZerobusStream {
Expand Down
Loading
Loading