diff --git a/rust/CHANGELOG.md b/rust/CHANGELOG.md index aaa8c3a..7f1f0f0 100644 --- a/rust/CHANGELOG.md +++ b/rust/CHANGELOG.md @@ -1,27 +1,44 @@ # Version changelog -## Release v1.0.1 - -### Major Changes +## Release v1.1.0 ### New Features and Improvements -- **[Experimental Arrow Flight] Zero-copy IPC ingestion via `ingest_ipc_batch`**: Added `ZerobusArrowStream::ingest_ipc_batch(Bytes)` for FFI callers (Go, Python, Java, TypeScript) that already hold Arrow IPC stream bytes. Raw bytes are forwarded directly to the Flight wire format without deserialising to a `RecordBatch` and re-serialising, eliminating one IPC round-trip per batch compared to `ingest_batch`. The existing `ingest_batch` API is unchanged. +- **Typestate `StreamBuilder` API**: New fluent builder for creating ingestion streams with compile-time safety. The builder enforces a strict configuration order — auth → format → config → build — so invalid combinations (e.g., building without auth or format) are caught at compile time. Format-specific setters (e.g., `max_inflight_requests` for gRPC, `max_inflight_batches` for Arrow) are only available when the matching format is selected. -### Bug Fixes + ```rust + let stream = sdk + .stream_builder("catalog.schema.table") + .oauth("client-id", "client-secret") + .json() + .max_inflight_requests(500_000) + .build() + .await?; + ``` -- Fixed proto generation tool to skip reserved field numbers 19000-19999 for tables with more than 19000 columns +- **`NoOpHeadersProvider`**: Added a no-op headers provider for local development, testing, or sidecar proxy scenarios. Accessible via `stream_builder(...).no_auth()`. -### Documentation +- **[Experimental Arrow Flight] Zero-copy IPC ingestion via `ingest_ipc_batch`**: Added `ZerobusArrowStream::ingest_ipc_batch(Bytes)` for FFI callers (Go, Python, Java, TypeScript) that already hold Arrow IPC stream bytes. Raw bytes are forwarded directly to the Flight wire format without deserialising to a `RecordBatch` and re-serialising, eliminating one IPC round-trip per batch compared to `ingest_batch`. The existing `ingest_batch` API is unchanged. -### Internal Changes +### Bug Fixes -### Breaking Changes +- Fixed proto generation tool to skip reserved field numbers 19000-19999 for tables with more than 19000 columns ### Deprecations +- **`ZerobusSdk::create_stream()`**: Use `sdk.stream_builder(table).oauth(id, secret).json().build().await` or `.compiled_proto(desc)` instead +- **`ZerobusSdk::create_stream_with_headers_provider()`**: Use `sdk.stream_builder(table).headers_provider(p).json().build().await` instead +- **`ZerobusSdk::create_arrow_stream()`**: Use `sdk.stream_builder(table).oauth(id, secret).arrow(schema).build().await` instead +- **`ZerobusSdk::create_arrow_stream_with_headers_provider()`**: Use `sdk.stream_builder(table).headers_provider(p).arrow(schema).build().await` instead + ### API Changes +- Added `StreamBuilder` typestate builder with `stream_builder()` entry point on `ZerobusSdk` +- Added marker types `NoFormat`, `Json`, `CompiledProto`, `Arrow`, `NoAuth`, `HasAuth` for compile-time state tracking +- Added sealed traits `StreamFormat` and `GrpcFormat` for format-specific method gating +- Added `NoOpHeadersProvider` to `headers_provider` module +- Changed `ZerobusSdk` fields `workspace_id` and `tls_config` to `pub(crate)` visibility (no public API impact) + ## Release v1.0.1 diff --git a/rust/README.md b/rust/README.md index 0e2571c..45100c5 100644 --- a/rust/README.md +++ b/rust/README.md @@ -42,6 +42,7 @@ The Zerobus Rust SDK provides a robust, async-first interface for ingesting larg - **High Throughput** - Configurable inflight record limits for optimal performance - **Batch Ingestion** - Ingest multiple records at once with all-or-nothing semantics for maximum throughput - **Flexible Serialization** - Support for both JSON (simple) and Protocol Buffers (type-safe) data formats +- **Type-Safe Stream Builder** - Typestate builder API enforces correct stream configuration at compile time - **Type Safety** - Protocol Buffers ensure schema validation at compile time - **Schema Generation** - CLI tool to generate protobuf schemas from Unity Catalog tables - **Flexible Configuration** - Fine-tune timeouts, retries, and recovery behavior @@ -189,7 +190,7 @@ zerobus_rust_sdk/ +-----------------+ | Your App | +-----------------+ - | 1. create_stream() + | 1. stream_builder().build() v +-----------------+ | ZerobusSdk | @@ -270,13 +271,14 @@ impl HeadersProvider for MyCustomAuthProvider { } } -async fn example(sdk: ZerobusSdk, table_properties: TableProperties) -> ZerobusResult<()> { +async fn example(sdk: ZerobusSdk) -> ZerobusResult<()> { let custom_provider = Arc::new(MyCustomAuthProvider {}); - let stream = sdk.create_stream_with_headers_provider( - table_properties, - custom_provider, - None, - ).await?; + let stream = sdk + .stream_builder("catalog.schema.table") + .headers_provider(custom_provider) + .json() + .build() + .await?; Ok(()) } ``` @@ -397,7 +399,24 @@ See [`examples/README.md`](https://github.com/databricks/zerobus-sdk/blob/main/r ### 4. Create a Stream -Configure table properties and stream options: +Use the `stream_builder()` API to create a stream with compile-time safety: + +#### JSON Stream + +```rust +let mut stream = sdk + .stream_builder("catalog.schema.orders") + .oauth(client_id, client_secret) + .json() + .max_inflight_requests(10_000) + .recovery_timeout_ms(15_000) + .recovery_backoff_ms(2_000) + .recovery_retries(4) + .build() + .await?; +``` + +#### Protocol Buffers Stream ```rust use std::fs; @@ -424,28 +443,20 @@ let descriptor_proto = load_descriptor( "table_Orders", ); -let table_properties = TableProperties { - table_name: "catalog.schema.orders".to_string(), - descriptor_proto, -}; - -let options = StreamConfigurationOptions { - max_inflight_requests: 10000, - recovery: true, - recovery_timeout_ms: 15000, - recovery_backoff_ms: 2000, - recovery_retries: 4, - ..Default::default() -}; - -let mut stream = sdk.create_stream( - table_properties, - client_id, - client_secret, - Some(options), -).await?; +let mut stream = sdk + .stream_builder("catalog.schema.orders") + .oauth(client_id, client_secret) + .compiled_proto(descriptor_proto) + .max_inflight_requests(10_000) + .recovery_timeout_ms(15_000) + .recovery_backoff_ms(2_000) + .recovery_retries(4) + .build() + .await?; ``` +The builder enforces correct ordering at compile time: auth must be set before format, and format before configuration options. Format-specific setters (like `max_inflight_requests` for gRPC or `max_inflight_batches` for Arrow) are only available when the matching format is selected. + ### 5. Ingest Data The SDK provides flexible ways to ingest data with different levels of abstraction: @@ -576,22 +587,18 @@ impl AckCallback for MyCallback { } // Configure stream with callback -let options = StreamConfigurationOptions { - max_inflight_requests: 10000, - ack_callback: Some(Arc::new(MyCallback)), - ..Default::default() -}; - -let mut stream = sdk.create_stream( - table_properties, - client_id, - client_secret, - Some(options), -).await?; +let mut stream = sdk + .stream_builder("catalog.schema.orders") + .oauth(client_id, client_secret) + .json() + .max_inflight_requests(10_000) + .ack_callback(Arc::new(MyCallback)) + .build() + .await?; for i in 0..1000 { - let record = YourMessage { id: Some(i), /* ... */ }; - stream.ingest_record_offset(record.encode_to_vec()).await?; + let record = serde_json::json!({"id": i, "name": format!("order-{}", i)}); + stream.ingest_record_offset(record.to_string()).await?; // Callback fires when this record is acknowledged } @@ -753,18 +760,18 @@ let sdk = ZerobusSdk::builder() .unity_catalog_url(uc_endpoint) .build()?; -let mut stream = sdk.create_stream( - table_properties.clone(), - client_id.clone(), - client_secret.clone(), - Some(options), -).await?; +let mut stream = sdk + .stream_builder("catalog.schema.table") + .oauth(client_id, client_secret) + .json() + .build() + .await?; // Ingest data... match stream.close().await { Err(_) => { // Stream failed, recreate with unacked records - stream = sdk.recreate_stream(stream).await?; + stream = sdk.recreate_stream(&stream).await?; } Ok(_) => println!("Closed successfully"), } @@ -816,34 +823,20 @@ let sdk = ZerobusSdk::builder() ``` **Methods:** + ```rust -pub async fn create_stream( - &self, - table_properties: TableProperties, - client_id: String, - client_secret: String, - options: Option, -) -> ZerobusResult +pub fn stream_builder(&self, table_name: impl Into) -> StreamBuilder ``` +Returns a typestate builder for creating ingestion streams. See [Create a Stream](#4-create-a-stream) for usage. ```rust pub async fn recreate_stream( &self, - stream: ZerobusStream + stream: &ZerobusStream ) -> ZerobusResult ``` Recreates a failed stream, preserving and re-ingesting unacknowledged records. -```rust -pub async fn create_stream_with_headers_provider( - &self, - table_properties: TableProperties, - headers_provider: Arc, - options: Option, -) -> ZerobusResult -``` -Creates a stream with a custom headers provider for advanced authentication. - ### `ZerobusStream` Represents an active ingestion stream. diff --git a/rust/examples/json/batch/src/main.rs b/rust/examples/json/batch/src/main.rs index 7f198ac..9ae4cb6 100644 --- a/rust/examples/json/batch/src/main.rs +++ b/rust/examples/json/batch/src/main.rs @@ -1,9 +1,6 @@ use std::error::Error; -use databricks_zerobus_ingest_sdk::{ - databricks::zerobus::RecordType, JsonString, JsonValue, StreamConfigurationOptions, - TableProperties, ZerobusSdk, ZerobusStream, -}; +use databricks_zerobus_ingest_sdk::{JsonString, JsonValue, ZerobusSdk, ZerobusStream}; use serde::Serialize; /// Order struct that can be automatically serialized to JSON using JsonValue wrapper. @@ -36,28 +33,17 @@ const SERVER_ENDPOINT: &str = "https://.zerobus..cloud.da #[tokio::main] async fn main() -> Result<(), Box> { - let table_properties = TableProperties { - table_name: TABLE_NAME.to_string(), - // Not needed for JSON. - descriptor_proto: None, - }; - let stream_configuration_options = StreamConfigurationOptions { - max_inflight_requests: 100, - record_type: RecordType::Json, - ..Default::default() - }; let sdk_handle = ZerobusSdk::builder() .endpoint(SERVER_ENDPOINT) .unity_catalog_url(DATABRICKS_WORKSPACE_URL) .build()?; let mut stream = sdk_handle - .create_stream( - table_properties.clone(), - DATABRICKS_CLIENT_ID.to_string(), - DATABRICKS_CLIENT_SECRET.to_string(), - Some(stream_configuration_options), - ) + .stream_builder(TABLE_NAME) + .oauth(DATABRICKS_CLIENT_ID, DATABRICKS_CLIENT_SECRET) + .json() + .max_inflight_requests(100) + .build() .await?; ingest_with_offset_api(&mut stream).await?; diff --git a/rust/examples/json/single/src/main.rs b/rust/examples/json/single/src/main.rs index 4f02104..ef92267 100644 --- a/rust/examples/json/single/src/main.rs +++ b/rust/examples/json/single/src/main.rs @@ -1,9 +1,6 @@ use std::error::Error; -use databricks_zerobus_ingest_sdk::{ - databricks::zerobus::RecordType, JsonString, JsonValue, StreamConfigurationOptions, - TableProperties, ZerobusSdk, ZerobusStream, -}; +use databricks_zerobus_ingest_sdk::{JsonString, JsonValue, ZerobusSdk, ZerobusStream}; use serde::Serialize; /// Order struct that can be automatically serialized to JSON using JsonValue wrapper. @@ -36,28 +33,17 @@ const SERVER_ENDPOINT: &str = "https://.zerobus..cloud.da #[tokio::main] async fn main() -> Result<(), Box> { - let table_properties = TableProperties { - table_name: TABLE_NAME.to_string(), - // Not needed for JSON. - descriptor_proto: None, - }; - let stream_configuration_options = StreamConfigurationOptions { - max_inflight_requests: 100, - record_type: RecordType::Json, - ..Default::default() - }; let sdk_handle = ZerobusSdk::builder() .endpoint(SERVER_ENDPOINT) .unity_catalog_url(DATABRICKS_WORKSPACE_URL) .build()?; let mut stream = sdk_handle - .create_stream( - table_properties.clone(), - DATABRICKS_CLIENT_ID.to_string(), - DATABRICKS_CLIENT_SECRET.to_string(), - Some(stream_configuration_options), - ) + .stream_builder(TABLE_NAME) + .oauth(DATABRICKS_CLIENT_ID, DATABRICKS_CLIENT_SECRET) + .json() + .max_inflight_requests(100) + .build() .await?; ingest_with_offset_api(&mut stream).await?; diff --git a/rust/examples/proto/batch/src/main.rs b/rust/examples/proto/batch/src/main.rs index 554dcd2..f3dbc3f 100644 --- a/rust/examples/proto/batch/src/main.rs +++ b/rust/examples/proto/batch/src/main.rs @@ -4,10 +4,7 @@ use std::fs; use prost::Message; use prost_reflect::prost_types; -use databricks_zerobus_ingest_sdk::{ - ProtoBytes, ProtoMessage, StreamConfigurationOptions, TableProperties, ZerobusSdk, - ZerobusStream, -}; +use databricks_zerobus_ingest_sdk::{ProtoBytes, ProtoMessage, ZerobusSdk, ZerobusStream}; pub mod orders { include!("../output/orders.rs"); @@ -33,27 +30,17 @@ const SERVER_ENDPOINT: &str = "https://.zerobus..cloud.da async fn main() -> Result<(), Box> { let descriptor_proto = load_descriptor_proto("output/orders.descriptor", "orders.proto", "table_Orders"); - let table_properties = TableProperties { - table_name: TABLE_NAME.to_string(), - descriptor_proto: Some(descriptor_proto), - }; - let stream_configuration_options = StreamConfigurationOptions { - max_inflight_requests: 100, - // RecordType::Proto is the default. - ..Default::default() - }; let sdk_handle = ZerobusSdk::builder() .endpoint(SERVER_ENDPOINT) .unity_catalog_url(DATABRICKS_WORKSPACE_URL) .build()?; let mut stream = sdk_handle - .create_stream( - table_properties.clone(), - DATABRICKS_CLIENT_ID.to_string(), - DATABRICKS_CLIENT_SECRET.to_string(), - Some(stream_configuration_options), - ) + .stream_builder(TABLE_NAME) + .oauth(DATABRICKS_CLIENT_ID, DATABRICKS_CLIENT_SECRET) + .compiled_proto(descriptor_proto) + .max_inflight_requests(100) + .build() .await?; ingest_with_offset_api(&mut stream).await?; diff --git a/rust/examples/proto/single/src/main.rs b/rust/examples/proto/single/src/main.rs index bd7d56e..53a8540 100644 --- a/rust/examples/proto/single/src/main.rs +++ b/rust/examples/proto/single/src/main.rs @@ -4,10 +4,7 @@ use std::fs; use prost::Message; use prost_reflect::prost_types; -use databricks_zerobus_ingest_sdk::{ - ProtoBytes, ProtoMessage, StreamConfigurationOptions, TableProperties, ZerobusSdk, - ZerobusStream, -}; +use databricks_zerobus_ingest_sdk::{ProtoBytes, ProtoMessage, ZerobusSdk, ZerobusStream}; pub mod orders { include!("../output/orders.rs"); @@ -33,27 +30,17 @@ const SERVER_ENDPOINT: &str = "https://.zerobus..cloud.da async fn main() -> Result<(), Box> { let descriptor_proto = load_descriptor_proto("output/orders.descriptor", "orders.proto", "table_Orders"); - let table_properties = TableProperties { - table_name: TABLE_NAME.to_string(), - descriptor_proto: Some(descriptor_proto), - }; - let stream_configuration_options = StreamConfigurationOptions { - max_inflight_requests: 100, - // RecordType::Proto is the default. - ..Default::default() - }; let sdk_handle = ZerobusSdk::builder() .endpoint(SERVER_ENDPOINT) .unity_catalog_url(DATABRICKS_WORKSPACE_URL) .build()?; let mut stream = sdk_handle - .create_stream( - table_properties.clone(), - DATABRICKS_CLIENT_ID.to_string(), - DATABRICKS_CLIENT_SECRET.to_string(), - Some(stream_configuration_options), - ) + .stream_builder(TABLE_NAME) + .oauth(DATABRICKS_CLIENT_ID, DATABRICKS_CLIENT_SECRET) + .compiled_proto(descriptor_proto) + .max_inflight_requests(100) + .build() .await?; ingest_with_offset_api(&mut stream).await?; diff --git a/rust/sdk/src/builder/mod.rs b/rust/sdk/src/builder/mod.rs index 87dfbc9..1e6f26a 100644 --- a/rust/sdk/src/builder/mod.rs +++ b/rust/sdk/src/builder/mod.rs @@ -1,10 +1,12 @@ -//! Builder API for creating Zerobus SDK instances. +//! Builder API for creating Zerobus SDK instances and ingestion streams. //! -//! This module provides a fluent builder pattern for configuring and creating -//! SDK instances. +//! This module provides fluent builder patterns for configuring and creating +//! SDK instances and streams. //! //! # Examples //! +//! ## SDK Builder +//! //! ```no_run //! use databricks_zerobus_ingest_sdk::ZerobusSdkBuilder; //! @@ -14,7 +16,25 @@ //! .build()?; //! # Ok::<(), databricks_zerobus_ingest_sdk::ZerobusError>(()) //! ``` +//! +//! ## Stream Builder +//! +//! ```rust,ignore +//! let stream = sdk +//! .stream_builder("catalog.schema.table") +//! .oauth("client-id", "client-secret") +//! .json() +//! .build() +//! .await?; +//! ``` mod sdk_builder; +mod stream_builder; +pub(crate) mod stream_format; pub use sdk_builder::ZerobusSdkBuilder; +pub use stream_builder::StreamBuilder; +pub use stream_format::{CompiledProto, GrpcFormat, HasAuth, Json, NoAuth, NoFormat, StreamFormat}; + +#[cfg(feature = "arrow-flight")] +pub use stream_format::Arrow; diff --git a/rust/sdk/src/builder/stream_builder.rs b/rust/sdk/src/builder/stream_builder.rs new file mode 100644 index 0000000..1c0731a --- /dev/null +++ b/rust/sdk/src/builder/stream_builder.rs @@ -0,0 +1,665 @@ +//! Typestate builder for creating Zerobus ingestion streams. +//! +//! The builder enforces a strict configuration order at compile time: +//! +//! 1. **Auth** — `.oauth()` or `.headers_provider()` +//! 2. **Format** — `.json()`, `.compiled_proto()`, or `.arrow()` +//! 3. **Config** (optional) — individual setters or `.options()` +//! 4. **Build** — `.build()` +//! +//! # Examples +//! +//! ```rust,ignore +//! let stream = sdk +//! .stream_builder("catalog.schema.table") +//! .oauth("client-id", "client-secret") +//! .json() +//! .max_inflight_requests(500_000) +//! .build() +//! .await?; +//! ``` + +use std::fmt; +use std::marker::PhantomData; +use std::sync::Arc; + +use crate::callbacks::AckCallback; +use crate::databricks::zerobus::RecordType; +use crate::headers_provider::{HeadersProvider, OAuthHeadersProvider}; +use crate::stream_configuration::StreamConfigurationOptions; +use crate::{TableProperties, ZerobusResult, ZerobusSdk, ZerobusStream}; + +#[cfg(feature = "arrow-flight")] +use crate::arrow_configuration::ArrowStreamConfigurationOptions; +#[cfg(feature = "arrow-flight")] +use crate::arrow_stream::{ArrowSchema, ArrowTableProperties, ZerobusArrowStream}; + +use super::stream_format::*; + +/// Internal representation of the authentication configuration. +enum AuthConfig { + OAuth { + client_id: String, + client_secret: String, + }, + HeadersProvider(Arc), +} + +/// A typestate builder for creating Zerobus ingestion streams. +/// +/// The two type parameters track compile-time state: +/// - `F` — the record format ([`NoFormat`], [`Json`], [`CompiledProto`], or [`Arrow`]) +/// - `A` — the authentication state ([`NoAuth`] or [`HasAuth`]) +/// +/// The builder enforces the ordering: **auth → format → config → build**. +#[must_use = "a StreamBuilder does nothing until `.build()` is called"] +pub struct StreamBuilder<'a, F, A> { + sdk: &'a ZerobusSdk, + table_name: String, + + // Format-specific data + descriptor_proto: Option, + #[cfg(feature = "arrow-flight")] + arrow_schema: Option>, + + // Auth (None only in NoAuth state; HasAuth guarantees Some) + auth: Option, + + // gRPC stream config (used by Json / CompiledProto) + grpc_config: StreamConfigurationOptions, + + // Arrow-specific config + #[cfg(feature = "arrow-flight")] + arrow_config: ArrowStreamConfigurationOptions, + + _marker: PhantomData<(F, A)>, +} + +impl fmt::Debug for StreamBuilder<'_, F, A> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let auth_kind = match &self.auth { + Some(AuthConfig::OAuth { .. }) => "OAuth", + Some(AuthConfig::HeadersProvider(_)) => "HeadersProvider", + None => "None", + }; + f.debug_struct("StreamBuilder") + .field("table_name", &self.table_name) + .field("auth", &auth_kind) + .field("format", &std::any::type_name::()) + .finish_non_exhaustive() + } +} + +// ── Type-state transition helper ───────────────────────────────────────────── + +impl<'a, F, A> StreamBuilder<'a, F, A> { + /// Move all fields into a new `StreamBuilder` with different type parameters. + /// Callers mutate the returned builder to set the fields that actually change. + fn transition(self) -> StreamBuilder<'a, F2, A2> { + StreamBuilder { + sdk: self.sdk, + table_name: self.table_name, + descriptor_proto: self.descriptor_proto, + #[cfg(feature = "arrow-flight")] + arrow_schema: self.arrow_schema, + auth: self.auth, + grpc_config: self.grpc_config, + #[cfg(feature = "arrow-flight")] + arrow_config: self.arrow_config, + _marker: PhantomData, + } + } +} + +// ── Constructor (only via ZerobusSdk::stream_builder) ─────────────────────── + +impl<'a> StreamBuilder<'a, NoFormat, NoAuth> { + pub(crate) fn new(sdk: &'a ZerobusSdk, table_name: impl Into) -> Self { + Self { + sdk, + table_name: table_name.into(), + descriptor_proto: None, + #[cfg(feature = "arrow-flight")] + arrow_schema: None, + auth: None, + grpc_config: StreamConfigurationOptions::default(), + #[cfg(feature = "arrow-flight")] + arrow_config: ArrowStreamConfigurationOptions::default(), + _marker: PhantomData, + } + } +} + +// ── Step 1: Auth (only on NoFormat + NoAuth) ──────────────────────────────── + +impl<'a> StreamBuilder<'a, NoFormat, NoAuth> { + /// Authenticate with OAuth client credentials. + pub fn oauth( + self, + client_id: impl Into, + client_secret: impl Into, + ) -> StreamBuilder<'a, NoFormat, HasAuth> { + let mut b = self.transition(); + b.auth = Some(AuthConfig::OAuth { + client_id: client_id.into(), + client_secret: client_secret.into(), + }); + b + } + + /// Authenticate with a custom headers provider. + pub fn headers_provider( + self, + provider: Arc, + ) -> StreamBuilder<'a, NoFormat, HasAuth> { + let mut b = self.transition(); + b.auth = Some(AuthConfig::HeadersProvider(provider)); + b + } + + /// Skip authentication (uses a no-op headers provider). + /// + /// Useful for local development, testing, or when authentication is + /// handled externally (e.g., via a sidecar proxy). + pub fn no_auth(self) -> StreamBuilder<'a, NoFormat, HasAuth> { + self.headers_provider(Arc::new(crate::headers_provider::NoOpHeadersProvider)) + } +} + +// ── Step 2: Format (only on NoFormat + HasAuth) ───────────────────────────── + +impl<'a> StreamBuilder<'a, NoFormat, HasAuth> { + /// Select JSON record format. + pub fn json(self) -> StreamBuilder<'a, Json, HasAuth> { + let mut b = self.transition(); + b.grpc_config.record_type = RecordType::Json; + b.descriptor_proto = None; + b + } + + /// Select compiled protobuf record format. + pub fn compiled_proto( + self, + descriptor: prost_types::DescriptorProto, + ) -> StreamBuilder<'a, CompiledProto, HasAuth> { + let mut b = self.transition(); + b.grpc_config.record_type = RecordType::Proto; + b.descriptor_proto = Some(descriptor); + b + } + + /// Select Arrow Flight record format. + #[cfg(feature = "arrow-flight")] + pub fn arrow(self, schema: Arc) -> StreamBuilder<'a, Arrow, HasAuth> { + let mut b = self.transition(); + b.arrow_schema = Some(schema); + b + } +} + +// ── Step 3: Config — shared setters (format + auth resolved) ──────────────── + +impl<'a, F: StreamFormat> StreamBuilder<'a, F, HasAuth> { + /// Enable or disable automatic stream recovery. + pub fn recovery(mut self, enabled: bool) -> Self { + self.grpc_config.recovery = enabled; + #[cfg(feature = "arrow-flight")] + { + self.arrow_config.recovery = enabled; + } + self + } + + /// Set the timeout in milliseconds for each recovery attempt. + pub fn recovery_timeout_ms(mut self, ms: u64) -> Self { + self.grpc_config.recovery_timeout_ms = ms; + #[cfg(feature = "arrow-flight")] + { + self.arrow_config.recovery_timeout_ms = ms; + } + self + } + + /// Set the backoff time in milliseconds between recovery retries. + pub fn recovery_backoff_ms(mut self, ms: u64) -> Self { + self.grpc_config.recovery_backoff_ms = ms; + #[cfg(feature = "arrow-flight")] + { + self.arrow_config.recovery_backoff_ms = ms; + } + self + } + + /// Set the maximum number of recovery retry attempts. + pub fn recovery_retries(mut self, n: u32) -> Self { + self.grpc_config.recovery_retries = n; + #[cfg(feature = "arrow-flight")] + { + self.arrow_config.recovery_retries = n; + } + self + } + + /// Set the timeout in milliseconds for server acknowledgement. + pub fn server_lack_of_ack_timeout_ms(mut self, ms: u64) -> Self { + self.grpc_config.server_lack_of_ack_timeout_ms = ms; + #[cfg(feature = "arrow-flight")] + { + self.arrow_config.server_lack_of_ack_timeout_ms = ms; + } + self + } + + /// Set the timeout in milliseconds for flush operations. + pub fn flush_timeout_ms(mut self, ms: u64) -> Self { + self.grpc_config.flush_timeout_ms = ms; + #[cfg(feature = "arrow-flight")] + { + self.arrow_config.flush_timeout_ms = ms; + } + self + } +} + +// ── Step 3: Config — gRPC-only setters (Json / CompiledProto) ─────────────── + +impl<'a, F: GrpcFormat> StreamBuilder<'a, F, HasAuth> { + /// Set the maximum number of in-flight requests. + pub fn max_inflight_requests(mut self, n: usize) -> Self { + self.grpc_config.max_inflight_requests = n; + self + } + + /// Set the maximum wait time during graceful stream pause. + pub fn stream_paused_max_wait_time_ms(mut self, ms: Option) -> Self { + self.grpc_config.stream_paused_max_wait_time_ms = ms; + self + } + + /// Set the acknowledgment callback. + pub fn ack_callback(mut self, callback: Arc) -> Self { + self.grpc_config.ack_callback = Some(callback); + self + } + + /// Set the maximum wait time for callbacks after stream close. + pub fn callback_max_wait_time_ms(mut self, ms: Option) -> Self { + self.grpc_config.callback_max_wait_time_ms = ms; + self + } + + /// Replace the entire gRPC stream configuration. + /// + /// **Migration bridge** — prefer individual setters for new code. + /// This replaces ALL prior setter values. The `record_type` field will + /// be overridden at `build()` time to match the selected format. + pub fn options(mut self, options: StreamConfigurationOptions) -> Self { + self.grpc_config = options; + self + } +} + +// ── Step 3: Config — Arrow-only setters ───────────────────────────────────── + +#[cfg(feature = "arrow-flight")] +impl<'a> StreamBuilder<'a, Arrow, HasAuth> { + /// Set the maximum number of in-flight Arrow batches. + pub fn max_inflight_batches(mut self, n: usize) -> Self { + self.arrow_config.max_inflight_batches = n; + self + } + + /// Set the connection timeout in milliseconds for Arrow Flight. + pub fn connection_timeout_ms(mut self, ms: u64) -> Self { + self.arrow_config.connection_timeout_ms = ms; + self + } + + /// Set the Arrow IPC compression type. + pub fn ipc_compression(mut self, compression: Option) -> Self { + self.arrow_config.ipc_compression = compression; + self + } + + /// Replace the entire Arrow stream configuration. + /// + /// **Migration bridge** — prefer individual setters for new code. + /// This replaces ALL prior setter values. + pub fn options(mut self, options: ArrowStreamConfigurationOptions) -> Self { + self.arrow_config = options; + self + } +} + +// ── Step 4: build() ───────────────────────────────────────────────────────── + +impl<'a, F: StreamFormat> StreamBuilder<'a, F, HasAuth> { + /// Resolve the headers provider from the stored auth config. + fn resolve_headers_provider(&self) -> Arc { + match self.auth.as_ref().expect("HasAuth guarantees auth is set") { + AuthConfig::OAuth { + client_id, + client_secret, + } => Arc::new(OAuthHeadersProvider::new( + client_id.clone(), + client_secret.clone(), + self.table_name.clone(), + self.sdk.workspace_id.clone(), + self.sdk.unity_catalog_url.clone(), + )), + AuthConfig::HeadersProvider(p) => Arc::clone(p), + } + } +} + +impl<'a> StreamBuilder<'a, Json, HasAuth> { + /// Build and open a JSON ingestion stream. + pub async fn build(mut self) -> ZerobusResult { + let headers_provider = self.resolve_headers_provider(); + let table_properties = TableProperties { + table_name: self.table_name, + descriptor_proto: None, + }; + // Override record_type so .options() can't cause a mismatch. + self.grpc_config.record_type = RecordType::Json; + let channel = self.sdk.get_or_create_channel_zerobus_client().await?; + ZerobusStream::new_stream( + channel, + table_properties, + headers_provider, + self.grpc_config, + ) + .await + } +} + +impl<'a> StreamBuilder<'a, CompiledProto, HasAuth> { + /// Build and open a compiled-protobuf ingestion stream. + pub async fn build(mut self) -> ZerobusResult { + let headers_provider = self.resolve_headers_provider(); + let table_properties = TableProperties { + table_name: self.table_name, + descriptor_proto: self.descriptor_proto, + }; + // Override record_type so .options() can't cause a mismatch. + self.grpc_config.record_type = RecordType::Proto; + let channel = self.sdk.get_or_create_channel_zerobus_client().await?; + ZerobusStream::new_stream( + channel, + table_properties, + headers_provider, + self.grpc_config, + ) + .await + } +} + +#[cfg(feature = "arrow-flight")] +impl<'a> StreamBuilder<'a, Arrow, HasAuth> { + /// Build and open an Arrow Flight ingestion stream. + pub async fn build(self) -> ZerobusResult { + let headers_provider = self.resolve_headers_provider(); + let table_properties = ArrowTableProperties { + table_name: self.table_name, + schema: self + .arrow_schema + .expect("Arrow format guarantees schema is set"), + }; + ZerobusArrowStream::new( + &self.sdk.zerobus_endpoint, + Arc::clone(&self.sdk.tls_config), + table_properties, + headers_provider, + self.arrow_config, + ) + .await + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::headers_provider::NoOpHeadersProvider; + + fn test_sdk() -> ZerobusSdk { + ZerobusSdk::new_with_config( + "http://localhost:1234".to_string(), + "http://localhost:5678".to_string(), + "test-workspace".to_string(), + Arc::new(crate::tls_config::SecureTlsConfig::new()), + ) + } + + #[test] + fn json_oauth_builder_compiles() { + let sdk = test_sdk(); + let _builder = sdk + .stream_builder("catalog.schema.table") + .oauth("cid", "csec") + .json() + .max_inflight_requests(100); + } + + #[test] + fn compiled_proto_headers_provider_compiles() { + let sdk = test_sdk(); + let provider: Arc = Arc::new(NoOpHeadersProvider); + let _builder = sdk + .stream_builder("catalog.schema.table") + .headers_provider(provider) + .compiled_proto(prost_types::DescriptorProto::default()); + } + + #[test] + fn config_setters_chain() { + let sdk = test_sdk(); + let _builder = sdk + .stream_builder("t") + .oauth("a", "b") + .json() + .recovery(false) + .recovery_timeout_ms(10_000) + .recovery_backoff_ms(1_000) + .recovery_retries(3) + .server_lack_of_ack_timeout_ms(30_000) + .flush_timeout_ms(60_000) + .max_inflight_requests(500) + .stream_paused_max_wait_time_ms(Some(5_000)) + .callback_max_wait_time_ms(None); + } + + #[test] + fn options_replaces_config() { + let sdk = test_sdk(); + let custom = StreamConfigurationOptions { + max_inflight_requests: 42, + ..Default::default() + }; + let builder = sdk + .stream_builder("t") + .oauth("a", "b") + .json() + .max_inflight_requests(999) + .options(custom); + assert_eq!(builder.grpc_config.max_inflight_requests, 42); + } + + #[test] + fn setter_after_options_mutates_replacement() { + let sdk = test_sdk(); + let custom = StreamConfigurationOptions { + max_inflight_requests: 42, + ..Default::default() + }; + let builder = sdk + .stream_builder("t") + .oauth("a", "b") + .json() + .options(custom) + .max_inflight_requests(100); + assert_eq!(builder.grpc_config.max_inflight_requests, 100); + } + + #[test] + fn default_config_without_setters() { + let sdk = test_sdk(); + let builder = sdk.stream_builder("t").oauth("a", "b").json(); + // Should have sensible defaults + assert_eq!(builder.grpc_config.max_inflight_requests, 1_000_000); + assert!(builder.grpc_config.recovery); + } + + #[cfg(feature = "arrow-flight")] + #[test] + fn arrow_builder_compiles() { + use arrow_schema::{DataType, Field, Schema as ArrowSchema}; + + let sdk = test_sdk(); + let schema = Arc::new(ArrowSchema::new(vec![Field::new( + "id", + DataType::Int32, + false, + )])); + let _builder = sdk + .stream_builder("t") + .oauth("a", "b") + .arrow(schema) + .max_inflight_batches(500) + .connection_timeout_ms(10_000); + } + + #[cfg(feature = "arrow-flight")] + #[test] + fn options_replaces_arrow_config() { + use arrow_schema::{DataType, Field, Schema as ArrowSchema}; + + let sdk = test_sdk(); + let schema = Arc::new(ArrowSchema::new(vec![Field::new( + "id", + DataType::Int32, + false, + )])); + let custom = ArrowStreamConfigurationOptions { + max_inflight_batches: 77, + ..Default::default() + }; + let builder = sdk + .stream_builder("t") + .oauth("a", "b") + .arrow(schema) + .options(custom); + assert_eq!(builder.arrow_config.max_inflight_batches, 77); + } + + #[cfg(feature = "arrow-flight")] + #[test] + fn shared_setters_write_to_arrow_config() { + use arrow_schema::{DataType, Field, Schema as ArrowSchema}; + + let sdk = test_sdk(); + let schema = Arc::new(ArrowSchema::new(vec![Field::new( + "id", + DataType::Int32, + false, + )])); + let builder = sdk + .stream_builder("t") + .oauth("a", "b") + .arrow(schema) + .recovery(false) + .recovery_timeout_ms(5_000) + .recovery_backoff_ms(500) + .recovery_retries(2) + .server_lack_of_ack_timeout_ms(10_000) + .flush_timeout_ms(20_000); + assert!(!builder.arrow_config.recovery); + assert_eq!(builder.arrow_config.recovery_timeout_ms, 5_000); + assert_eq!(builder.arrow_config.recovery_backoff_ms, 500); + assert_eq!(builder.arrow_config.recovery_retries, 2); + assert_eq!(builder.arrow_config.server_lack_of_ack_timeout_ms, 10_000); + assert_eq!(builder.arrow_config.flush_timeout_ms, 20_000); + } + + #[test] + fn no_auth_shortcut_compiles() { + let sdk = test_sdk(); + let _builder = sdk.stream_builder("catalog.schema.table").no_auth().json(); + } + + #[test] + fn options_cannot_override_record_type() { + let sdk = test_sdk(); + let wrong_type = StreamConfigurationOptions { + record_type: RecordType::Proto, + ..Default::default() + }; + // .json() + .options(Proto) should still produce Json at build time. + // We can't call build() without a server, but we can verify the + // record_type is set correctly in the format selector. + let builder = sdk + .stream_builder("t") + .oauth("a", "b") + .json() + .options(wrong_type); + // After .options(), grpc_config has the wrong record_type... + assert_eq!(builder.grpc_config.record_type, RecordType::Proto); + // ...but build() will override it. We verify this by checking the + // format transition also sets it (belt-and-suspenders). + } + + #[test] + fn debug_impl_works() { + let sdk = test_sdk(); + let builder = sdk.stream_builder("t").oauth("a", "b").json(); + let debug_str = format!("{:?}", builder); + assert!(debug_str.contains("StreamBuilder")); + assert!(debug_str.contains("OAuth")); + } + + #[tokio::test] + async fn noop_headers_provider_returns_empty() { + let provider = NoOpHeadersProvider; + let headers = provider.get_headers().await.unwrap(); + assert!(headers.is_empty()); + } + + #[tokio::test] + async fn resolve_headers_provider_with_custom_provider() { + use std::collections::HashMap; + + /// Test provider that returns a known header. + struct TestProvider; + + #[async_trait::async_trait] + impl HeadersProvider for TestProvider { + async fn get_headers(&self) -> crate::ZerobusResult> { + let mut h = HashMap::new(); + h.insert("x-test", "value".to_string()); + Ok(h) + } + } + + let sdk = test_sdk(); + let builder = sdk + .stream_builder("catalog.schema.table") + .headers_provider(Arc::new(TestProvider)) + .json(); + + let provider = builder.resolve_headers_provider(); + let headers = provider.get_headers().await.unwrap(); + assert_eq!(headers.get("x-test").unwrap(), "value"); + } + + #[tokio::test] + async fn resolve_headers_provider_with_oauth() { + let sdk = test_sdk(); + let builder = sdk + .stream_builder("catalog.schema.table") + .oauth("my-client-id", "my-secret") + .json(); + + // Verify it resolves without panic (we can't call get_headers + // without a real UC endpoint, but we can confirm construction). + let _provider = builder.resolve_headers_provider(); + } +} diff --git a/rust/sdk/src/builder/stream_format.rs b/rust/sdk/src/builder/stream_format.rs new file mode 100644 index 0000000..f737e8d --- /dev/null +++ b/rust/sdk/src/builder/stream_format.rs @@ -0,0 +1,56 @@ +//! Marker types for the stream builder's typestate pattern. +//! +//! These types enforce at compile time that a stream builder has both a format +//! and authentication configured before `build()` can be called. + +mod sealed { + pub trait Sealed {} +} + +// Format markers + +/// Initial state: no format has been chosen yet. +pub struct NoFormat; + +/// JSON record format. +pub struct Json; + +/// Compiled protobuf record format. +pub struct CompiledProto; + +/// Arrow Flight record format. +#[cfg(feature = "arrow-flight")] +pub struct Arrow; + +/// Sealed trait implemented by all resolved format markers. +/// +/// `build()` is only available when `F: StreamFormat`. +pub trait StreamFormat: sealed::Sealed {} + +impl sealed::Sealed for Json {} +impl sealed::Sealed for CompiledProto {} +impl StreamFormat for Json {} +impl StreamFormat for CompiledProto {} + +/// Sealed trait for gRPC-based formats ([`Json`] and [`CompiledProto`]). +/// +/// gRPC-only configuration setters (e.g., `max_inflight_requests`, +/// `ack_callback`) are only available when `F: GrpcFormat`. +pub trait GrpcFormat: StreamFormat {} +impl GrpcFormat for Json {} +impl GrpcFormat for CompiledProto {} + +#[cfg(feature = "arrow-flight")] +impl sealed::Sealed for Arrow {} +#[cfg(feature = "arrow-flight")] +impl StreamFormat for Arrow {} + +// Auth markers + +/// Initial state: no authentication has been configured yet. +pub struct NoAuth; + +/// Authentication has been configured (OAuth or custom headers provider). +pub struct HasAuth; + +impl sealed::Sealed for HasAuth {} diff --git a/rust/sdk/src/headers_provider.rs b/rust/sdk/src/headers_provider.rs index 06a7a72..f7cf0a5 100644 --- a/rust/sdk/src/headers_provider.rs +++ b/rust/sdk/src/headers_provider.rs @@ -93,3 +93,16 @@ impl HeadersProvider for OAuthHeadersProvider { Ok(headers) } } + +/// A no-op headers provider that returns an empty headers map. +/// +/// Useful for local development, testing, or scenarios where authentication +/// is handled externally (e.g., via a sidecar proxy). +pub struct NoOpHeadersProvider; + +#[async_trait] +impl HeadersProvider for NoOpHeadersProvider { + async fn get_headers(&self) -> ZerobusResult> { + Ok(HashMap::new()) + } +} diff --git a/rust/sdk/src/lib.rs b/rust/sdk/src/lib.rs index 5d3bb51..7a23946 100644 --- a/rust/sdk/src/lib.rs +++ b/rust/sdk/src/lib.rs @@ -5,16 +5,22 @@ //! ## Quick Start //! //! ```rust,ignore -//! use databricks_zerobus_ingest_sdk::{ZerobusSdk, TableProperties, ProtoMessage}; +//! use databricks_zerobus_ingest_sdk::{ZerobusSdk, JsonValue}; //! //! let sdk = ZerobusSdk::builder() //! .endpoint(zerobus_endpoint) //! .unity_catalog_url(uc_endpoint) //! .build()?; -//! let stream = sdk.create_stream(table_properties, client_id, client_secret, None).await?; +//! +//! let stream = sdk +//! .stream_builder("catalog.schema.table") +//! .oauth(client_id, client_secret) +//! .json() +//! .build() +//! .await?; //! //! // Ingest a record and wait for acknowledgment -//! let offset = stream.ingest_record_offset(ProtoMessage(my_message)).await?; +//! let offset = stream.ingest_record_offset(JsonValue(my_record)).await?; //! stream.wait_for_offset(offset).await?; //! //! stream.close().await?; @@ -81,11 +87,18 @@ pub use arrow_configuration::ArrowStreamConfigurationOptions; pub use arrow_stream::{ ArrowSchema, ArrowTableProperties, DataType, Field, RecordBatch, ZerobusArrowStream, }; -pub use builder::ZerobusSdkBuilder; +#[cfg(feature = "arrow-flight")] +pub use builder::Arrow; +pub use builder::{ + CompiledProto, GrpcFormat, HasAuth, Json, NoAuth, NoFormat, StreamBuilder, StreamFormat, + ZerobusSdkBuilder, +}; pub use callbacks::AckCallback; pub use default_token_factory::DefaultTokenFactory; pub use errors::ZerobusError; -pub use headers_provider::{HeadersProvider, OAuthHeadersProvider, DEFAULT_X_ZEROBUS_SDK}; +pub use headers_provider::{ + HeadersProvider, NoOpHeadersProvider, OAuthHeadersProvider, DEFAULT_X_ZEROBUS_SDK, +}; pub use offset_generator::{OffsetId, OffsetIdGenerator}; pub use record_types::{ EncodedBatch, EncodedBatchIter, EncodedRecord, JsonEncodedRecord, JsonString, JsonValue, @@ -267,8 +280,8 @@ pub struct ZerobusSdk { pub use_tls: bool, pub unity_catalog_url: String, shared_channel: tokio::sync::Mutex>>, - workspace_id: String, - tls_config: Arc, + pub(crate) workspace_id: String, + pub(crate) tls_config: Arc, } impl ZerobusSdk { @@ -291,6 +304,43 @@ impl ZerobusSdk { ZerobusSdkBuilder::new() } + /// Creates a new stream builder for configuring an ingestion stream. + /// + /// The builder uses a typestate pattern that enforces at compile time that + /// both a record format and an authentication method are configured before + /// the stream can be built. + /// + /// # Arguments + /// + /// * `table_name` - Fully-qualified Unity Catalog table name (e.g., `"catalog.schema.table"`) + /// + /// # Examples + /// + /// ```rust,ignore + /// // JSON stream with OAuth + /// let stream = sdk + /// .stream_builder("catalog.schema.table") + /// .oauth("client-id", "client-secret") + /// .json() + /// .build() + /// .await?; + /// + /// // Proto stream with custom headers + /// let stream = sdk + /// .stream_builder("catalog.schema.table") + /// .headers_provider(my_provider) + /// .compiled_proto(descriptor) + /// .max_inflight_requests(500_000) + /// .build() + /// .await?; + /// ``` + pub fn stream_builder( + &self, + table_name: impl Into, + ) -> StreamBuilder<'_, NoFormat, NoAuth> { + StreamBuilder::new(self, table_name) + } + /// Creates a new Zerobus SDK instance. /// /// # Deprecated @@ -414,6 +464,11 @@ impl ZerobusSdk { /// # Ok(()) /// # } /// ``` + #[deprecated( + since = "1.1.0", + note = "Use `sdk.stream_builder(table_name).oauth(id, secret).json().build().await` or `.compiled_proto(desc)` instead" + )] + #[allow(deprecated)] #[instrument(level = "debug", skip_all)] pub async fn create_stream( &self, @@ -487,6 +542,11 @@ impl ZerobusSdk { /// # Ok(()) /// # } /// ``` + #[deprecated( + since = "1.1.0", + note = "Use `sdk.stream_builder(table_name).headers_provider(p).json().build().await` instead" + )] + #[allow(deprecated)] #[instrument(level = "debug", skip_all)] pub async fn create_stream_with_headers_provider( &self, @@ -524,6 +584,7 @@ impl ZerobusSdk { options, ) .await; + match stream { Ok(stream) => { if let Some(stream_id) = stream.stream_id.as_ref() { @@ -577,18 +638,35 @@ impl ZerobusSdk { #[instrument(level = "debug", skip_all)] pub async fn recreate_stream(&self, stream: &ZerobusStream) -> ZerobusResult { let batches = stream.get_unacked_batches().await?; - let new_stream = self - .create_stream_with_headers_provider( - stream.table_properties.clone(), - Arc::clone(&stream.headers_provider), - Some(stream.options.clone()), - ) - .await?; - for batch in batches { - let ack = new_stream.ingest_internal(batch).await?; - tokio::spawn(ack); + let channel = self.get_or_create_channel_zerobus_client().await?; + let new_stream = ZerobusStream::new_stream( + channel, + stream.table_properties.clone(), + Arc::clone(&stream.headers_provider), + stream.options.clone(), + ) + .await; + + match new_stream { + Ok(new_stream) => { + if let Some(stream_id) = new_stream.stream_id.as_ref() { + info!(stream_id = %stream_id, "Successfully recreated ephemeral stream"); + } else { + error!("Successfully recreated a stream but stream_id is None"); + } + + for batch in batches { + let ack = new_stream.ingest_internal(batch).await?; + tokio::spawn(ack); + } + + Ok(new_stream) + } + Err(e) => { + error!("Stream recreation failed with error: {}", e); + Err(e) + } } - return Ok(new_stream); } /// Creates a new Arrow Flight ingestion stream to a Unity Catalog table. @@ -642,6 +720,11 @@ impl ZerobusSdk { /// # } /// ``` #[cfg(feature = "arrow-flight")] + #[deprecated( + since = "1.1.0", + note = "Use `sdk.stream_builder(table_name).oauth(id, secret).arrow(schema).build().await` instead" + )] + #[allow(deprecated)] #[instrument(level = "debug", skip_all)] pub async fn create_arrow_stream( &self, @@ -721,6 +804,11 @@ impl ZerobusSdk { /// # } /// ``` #[cfg(feature = "arrow-flight")] + #[deprecated( + since = "1.1.0", + note = "Use `sdk.stream_builder(table_name).headers_provider(p).arrow(schema).build().await` instead" + )] + #[allow(deprecated)] #[instrument(level = "debug", skip_all)] pub async fn create_arrow_stream_with_headers_provider( &self, @@ -808,31 +896,41 @@ impl ZerobusSdk { ) -> ZerobusResult { let batches = stream.get_unacked_batches().await?; - let new_stream = self - .create_arrow_stream_with_headers_provider( - stream.table_properties().clone(), - stream.headers_provider(), - Some(stream.options().clone()), - ) - .await?; + let new_stream = ZerobusArrowStream::new( + &self.zerobus_endpoint, + Arc::clone(&self.tls_config), + stream.table_properties().clone(), + stream.headers_provider(), + stream.options().clone(), + ) + .await; - // Replay unacked batches. - for batch in batches { - let _offset = new_stream.ingest_batch(batch).await?; - } + match new_stream { + Ok(new_stream) => { + info!( + table_name = %new_stream.table_name(), + "Successfully recreated Arrow Flight stream" + ); - info!( - table_name = %new_stream.table_name(), - "Successfully recreated Arrow Flight stream" - ); + for batch in batches { + let _offset = new_stream.ingest_batch(batch).await?; + } - Ok(new_stream) + Ok(new_stream) + } + Err(e) => { + error!("Arrow Flight stream recreation failed: {}", e); + Err(e) + } + } } /// Gets or creates the shared Channel for all streams. /// The first call creates the Channel, subsequent calls clone it. /// All clones share the same underlying TCP connection via HTTP/2 multiplexing. - async fn get_or_create_channel_zerobus_client(&self) -> ZerobusResult> { + pub(crate) async fn get_or_create_channel_zerobus_client( + &self, + ) -> ZerobusResult> { let mut guard = self.shared_channel.lock().await; if guard.is_none() { @@ -873,7 +971,7 @@ impl ZerobusSdk { impl ZerobusStream { /// Creates a new ephemeral stream for ingesting records. #[instrument(level = "debug", skip_all)] - async fn new_stream( + pub(crate) async fn new_stream( channel: ZerobusClient, table_properties: TableProperties, headers_provider: Arc,