Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 26 additions & 9 deletions rust/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,27 +1,44 @@
# Version changelog

## Release v1.0.1

### Major Changes
## Release v1.1.0

### New Features and Improvements

- **[Experimental Arrow Flight] Zero-copy IPC ingestion via `ingest_ipc_batch`**: Added `ZerobusArrowStream::ingest_ipc_batch(Bytes)` for FFI callers (Go, Python, Java, TypeScript) that already hold Arrow IPC stream bytes. Raw bytes are forwarded directly to the Flight wire format without deserialising to a `RecordBatch` and re-serialising, eliminating one IPC round-trip per batch compared to `ingest_batch`. The existing `ingest_batch` API is unchanged.
- **Typestate `StreamBuilder` API**: New fluent builder for creating ingestion streams with compile-time safety. The builder enforces a strict configuration order — auth → format → config → build — so invalid combinations (e.g., building without auth or format) are caught at compile time. Format-specific setters (e.g., `max_inflight_requests` for gRPC, `max_inflight_batches` for Arrow) are only available when the matching format is selected.

### Bug Fixes
```rust
let stream = sdk
.stream_builder("catalog.schema.table")
.oauth("client-id", "client-secret")
.json()
.max_inflight_requests(500_000)
.build()
.await?;
```

- Fixed proto generation tool to skip reserved field numbers 19000-19999 for tables with more than 19000 columns
- **`NoOpHeadersProvider`**: Added a no-op headers provider for local development, testing, or sidecar proxy scenarios. Accessible via `stream_builder(...).no_auth()`.

### Documentation
- **[Experimental Arrow Flight] Zero-copy IPC ingestion via `ingest_ipc_batch`**: Added `ZerobusArrowStream::ingest_ipc_batch(Bytes)` for FFI callers (Go, Python, Java, TypeScript) that already hold Arrow IPC stream bytes. Raw bytes are forwarded directly to the Flight wire format without deserialising to a `RecordBatch` and re-serialising, eliminating one IPC round-trip per batch compared to `ingest_batch`. The existing `ingest_batch` API is unchanged.

### Internal Changes
### Bug Fixes

### Breaking Changes
- Fixed proto generation tool to skip reserved field numbers 19000-19999 for tables with more than 19000 columns

### Deprecations

- **`ZerobusSdk::create_stream()`**: Use `sdk.stream_builder(table).oauth(id, secret).json().build().await` or `.compiled_proto(desc)` instead
- **`ZerobusSdk::create_stream_with_headers_provider()`**: Use `sdk.stream_builder(table).headers_provider(p).json().build().await` instead
- **`ZerobusSdk::create_arrow_stream()`**: Use `sdk.stream_builder(table).oauth(id, secret).arrow(schema).build().await` instead
- **`ZerobusSdk::create_arrow_stream_with_headers_provider()`**: Use `sdk.stream_builder(table).headers_provider(p).arrow(schema).build().await` instead

### API Changes

- Added `StreamBuilder` typestate builder with `stream_builder()` entry point on `ZerobusSdk`
- Added marker types `NoFormat`, `Json`, `CompiledProto`, `Arrow`, `NoAuth`, `HasAuth` for compile-time state tracking
- Added sealed traits `StreamFormat` and `GrpcFormat` for format-specific method gating
- Added `NoOpHeadersProvider` to `headers_provider` module
- Changed `ZerobusSdk` fields `workspace_id` and `tls_config` to `pub(crate)` visibility (no public API impact)


## Release v1.0.1

Expand Down
127 changes: 60 additions & 67 deletions rust/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ The Zerobus Rust SDK provides a robust, async-first interface for ingesting larg
- **High Throughput** - Configurable inflight record limits for optimal performance
- **Batch Ingestion** - Ingest multiple records at once with all-or-nothing semantics for maximum throughput
- **Flexible Serialization** - Support for both JSON (simple) and Protocol Buffers (type-safe) data formats
- **Type-Safe Stream Builder** - Typestate builder API enforces correct stream configuration at compile time
- **Type Safety** - Protocol Buffers ensure schema validation at compile time
- **Schema Generation** - CLI tool to generate protobuf schemas from Unity Catalog tables
- **Flexible Configuration** - Fine-tune timeouts, retries, and recovery behavior
Expand Down Expand Up @@ -189,7 +190,7 @@ zerobus_rust_sdk/
+-----------------+
| Your App |
+-----------------+
| 1. create_stream()
| 1. stream_builder().build()
v
+-----------------+
| ZerobusSdk |
Expand Down Expand Up @@ -270,13 +271,14 @@ impl HeadersProvider for MyCustomAuthProvider {
}
}

async fn example(sdk: ZerobusSdk, table_properties: TableProperties) -> ZerobusResult<()> {
async fn example(sdk: ZerobusSdk) -> ZerobusResult<()> {
let custom_provider = Arc::new(MyCustomAuthProvider {});
let stream = sdk.create_stream_with_headers_provider(
table_properties,
custom_provider,
None,
).await?;
let stream = sdk
.stream_builder("catalog.schema.table")
.headers_provider(custom_provider)
.json()
.build()
.await?;
Ok(())
}
```
Expand Down Expand Up @@ -397,7 +399,24 @@ See [`examples/README.md`](https://github.com/databricks/zerobus-sdk/blob/main/r

### 4. Create a Stream

Configure table properties and stream options:
Use the `stream_builder()` API to create a stream with compile-time safety:

#### JSON Stream

```rust
let mut stream = sdk
.stream_builder("catalog.schema.orders")
.oauth(client_id, client_secret)
.json()
.max_inflight_requests(10_000)
.recovery_timeout_ms(15_000)
.recovery_backoff_ms(2_000)
.recovery_retries(4)
.build()
.await?;
```

#### Protocol Buffers Stream

```rust
use std::fs;
Expand All @@ -424,28 +443,20 @@ let descriptor_proto = load_descriptor(
"table_Orders",
);

let table_properties = TableProperties {
table_name: "catalog.schema.orders".to_string(),
descriptor_proto,
};

let options = StreamConfigurationOptions {
max_inflight_requests: 10000,
recovery: true,
recovery_timeout_ms: 15000,
recovery_backoff_ms: 2000,
recovery_retries: 4,
..Default::default()
};

let mut stream = sdk.create_stream(
table_properties,
client_id,
client_secret,
Some(options),
).await?;
let mut stream = sdk
.stream_builder("catalog.schema.orders")
.oauth(client_id, client_secret)
.compiled_proto(descriptor_proto)
.max_inflight_requests(10_000)
.recovery_timeout_ms(15_000)
.recovery_backoff_ms(2_000)
.recovery_retries(4)
.build()
.await?;
```

The builder enforces correct ordering at compile time: auth must be set before format, and format before configuration options. Format-specific setters (like `max_inflight_requests` for gRPC or `max_inflight_batches` for Arrow) are only available when the matching format is selected.

### 5. Ingest Data

The SDK provides flexible ways to ingest data with different levels of abstraction:
Expand Down Expand Up @@ -576,22 +587,18 @@ impl AckCallback for MyCallback {
}

// Configure stream with callback
let options = StreamConfigurationOptions {
max_inflight_requests: 10000,
ack_callback: Some(Arc::new(MyCallback)),
..Default::default()
};

let mut stream = sdk.create_stream(
table_properties,
client_id,
client_secret,
Some(options),
).await?;
let mut stream = sdk
.stream_builder("catalog.schema.orders")
.oauth(client_id, client_secret)
.json()
.max_inflight_requests(10_000)
.ack_callback(Arc::new(MyCallback))
.build()
.await?;

for i in 0..1000 {
let record = YourMessage { id: Some(i), /* ... */ };
stream.ingest_record_offset(record.encode_to_vec()).await?;
let record = serde_json::json!({"id": i, "name": format!("order-{}", i)});
stream.ingest_record_offset(record.to_string()).await?;
// Callback fires when this record is acknowledged
}

Expand Down Expand Up @@ -753,18 +760,18 @@ let sdk = ZerobusSdk::builder()
.unity_catalog_url(uc_endpoint)
.build()?;

let mut stream = sdk.create_stream(
table_properties.clone(),
client_id.clone(),
client_secret.clone(),
Some(options),
).await?;
let mut stream = sdk
.stream_builder("catalog.schema.table")
.oauth(client_id, client_secret)
.json()
.build()
.await?;

// Ingest data...
match stream.close().await {
Err(_) => {
// Stream failed, recreate with unacked records
stream = sdk.recreate_stream(stream).await?;
stream = sdk.recreate_stream(&stream).await?;
}
Ok(_) => println!("Closed successfully"),
}
Expand Down Expand Up @@ -816,34 +823,20 @@ let sdk = ZerobusSdk::builder()
```

**Methods:**

```rust
pub async fn create_stream(
&self,
table_properties: TableProperties,
client_id: String,
client_secret: String,
options: Option<StreamConfigurationOptions>,
) -> ZerobusResult<ZerobusStream>
pub fn stream_builder(&self, table_name: impl Into<String>) -> StreamBuilder<NoFormat, NoAuth>
```
Returns a typestate builder for creating ingestion streams. See [Create a Stream](#4-create-a-stream) for usage.

```rust
pub async fn recreate_stream(
&self,
stream: ZerobusStream
stream: &ZerobusStream
) -> ZerobusResult<ZerobusStream>
```
Recreates a failed stream, preserving and re-ingesting unacknowledged records.

```rust
pub async fn create_stream_with_headers_provider(
&self,
table_properties: TableProperties,
headers_provider: Arc<dyn HeadersProvider>,
options: Option<StreamConfigurationOptions>,
) -> ZerobusResult<ZerobusStream>
```
Creates a stream with a custom headers provider for advanced authentication.

### `ZerobusStream`

Represents an active ingestion stream.
Expand Down
26 changes: 6 additions & 20 deletions rust/examples/json/batch/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
use std::error::Error;

use databricks_zerobus_ingest_sdk::{
databricks::zerobus::RecordType, JsonString, JsonValue, StreamConfigurationOptions,
TableProperties, ZerobusSdk, ZerobusStream,
};
use databricks_zerobus_ingest_sdk::{JsonString, JsonValue, ZerobusSdk, ZerobusStream};
use serde::Serialize;

/// Order struct that can be automatically serialized to JSON using JsonValue wrapper.
Expand Down Expand Up @@ -36,28 +33,17 @@ const SERVER_ENDPOINT: &str = "https://<your-shard-id>.zerobus.<region>.cloud.da

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let table_properties = TableProperties {
table_name: TABLE_NAME.to_string(),
// Not needed for JSON.
descriptor_proto: None,
};
let stream_configuration_options = StreamConfigurationOptions {
max_inflight_requests: 100,
record_type: RecordType::Json,
..Default::default()
};
let sdk_handle = ZerobusSdk::builder()
.endpoint(SERVER_ENDPOINT)
.unity_catalog_url(DATABRICKS_WORKSPACE_URL)
.build()?;

let mut stream = sdk_handle
.create_stream(
table_properties.clone(),
DATABRICKS_CLIENT_ID.to_string(),
DATABRICKS_CLIENT_SECRET.to_string(),
Some(stream_configuration_options),
)
.stream_builder(TABLE_NAME)
.oauth(DATABRICKS_CLIENT_ID, DATABRICKS_CLIENT_SECRET)
.json()
.max_inflight_requests(100)
.build()
.await?;

ingest_with_offset_api(&mut stream).await?;
Expand Down
26 changes: 6 additions & 20 deletions rust/examples/json/single/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
use std::error::Error;

use databricks_zerobus_ingest_sdk::{
databricks::zerobus::RecordType, JsonString, JsonValue, StreamConfigurationOptions,
TableProperties, ZerobusSdk, ZerobusStream,
};
use databricks_zerobus_ingest_sdk::{JsonString, JsonValue, ZerobusSdk, ZerobusStream};
use serde::Serialize;

/// Order struct that can be automatically serialized to JSON using JsonValue wrapper.
Expand Down Expand Up @@ -36,28 +33,17 @@ const SERVER_ENDPOINT: &str = "https://<your-shard-id>.zerobus.<region>.cloud.da

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let table_properties = TableProperties {
table_name: TABLE_NAME.to_string(),
// Not needed for JSON.
descriptor_proto: None,
};
let stream_configuration_options = StreamConfigurationOptions {
max_inflight_requests: 100,
record_type: RecordType::Json,
..Default::default()
};
let sdk_handle = ZerobusSdk::builder()
.endpoint(SERVER_ENDPOINT)
.unity_catalog_url(DATABRICKS_WORKSPACE_URL)
.build()?;

let mut stream = sdk_handle
.create_stream(
table_properties.clone(),
DATABRICKS_CLIENT_ID.to_string(),
DATABRICKS_CLIENT_SECRET.to_string(),
Some(stream_configuration_options),
)
.stream_builder(TABLE_NAME)
.oauth(DATABRICKS_CLIENT_ID, DATABRICKS_CLIENT_SECRET)
.json()
.max_inflight_requests(100)
.build()
.await?;

ingest_with_offset_api(&mut stream).await?;
Expand Down
25 changes: 6 additions & 19 deletions rust/examples/proto/batch/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,7 @@ use std::fs;
use prost::Message;
use prost_reflect::prost_types;

use databricks_zerobus_ingest_sdk::{
ProtoBytes, ProtoMessage, StreamConfigurationOptions, TableProperties, ZerobusSdk,
ZerobusStream,
};
use databricks_zerobus_ingest_sdk::{ProtoBytes, ProtoMessage, ZerobusSdk, ZerobusStream};

pub mod orders {
include!("../output/orders.rs");
Expand All @@ -33,27 +30,17 @@ const SERVER_ENDPOINT: &str = "https://<your-shard-id>.zerobus.<region>.cloud.da
async fn main() -> Result<(), Box<dyn Error>> {
let descriptor_proto =
load_descriptor_proto("output/orders.descriptor", "orders.proto", "table_Orders");
let table_properties = TableProperties {
table_name: TABLE_NAME.to_string(),
descriptor_proto: Some(descriptor_proto),
};
let stream_configuration_options = StreamConfigurationOptions {
max_inflight_requests: 100,
// RecordType::Proto is the default.
..Default::default()
};
let sdk_handle = ZerobusSdk::builder()
.endpoint(SERVER_ENDPOINT)
.unity_catalog_url(DATABRICKS_WORKSPACE_URL)
.build()?;

let mut stream = sdk_handle
.create_stream(
table_properties.clone(),
DATABRICKS_CLIENT_ID.to_string(),
DATABRICKS_CLIENT_SECRET.to_string(),
Some(stream_configuration_options),
)
.stream_builder(TABLE_NAME)
.oauth(DATABRICKS_CLIENT_ID, DATABRICKS_CLIENT_SECRET)
.compiled_proto(descriptor_proto)
.max_inflight_requests(100)
.build()
.await?;

ingest_with_offset_api(&mut stream).await?;
Expand Down
Loading
Loading