SDK
Rust
Description
The Rust SDK's record ingestion pipeline has unnecessary copying overhead that affects all downstream language SDKs (TypeScript, Java, Python, Go). This task covers the Rust SDK changes needed to eliminate that overhead and expose new APIs that FFI layers can adopt in follow-up work.
Proto/JSON path: 2 deep copies per record in the landing zone
Every record is deep-copied twice before reaching gRPC:
LandingZone::observe() (landing_zone.rs:149) clones the entire Box<IngestRequest> (including all record bytes) to keep a copy in observed_items for retry.
- The sender task's
tokio::select! (lib.rs:1817) clones the observed item again due to ownership semantics.
Vec<u8> (proto) and String (JSON) both perform O(n) deep copies on .clone(). For a 1 MB protobuf record this is 2 MB of unnecessary allocation + memcpy per send.
Arrow path: double IPC serialization
All FFI layers serialize Arrow RecordBatches to IPC bytes to cross the language boundary. The Rust SDK deserializes back to RecordBatch, then FlightDataEncoderBuilder (arrow_stream.rs:431) re-serializes to IPC for the Flight gRPC protocol:
Language RecordBatch → IPC serialize → [FFI] → IPC deserialize → Rust RecordBatch
→ FlightDataEncoderBuilder → IPC RE-SERIALIZE → gRPC
This is documented as a TODO in the Python bindings (python/rust/src/arrow.rs:272). The same pattern exists in Go (rust/ffi/src/lib.rs:337), Java (rust/jni/src/arrow_stream.rs:107), and TypeScript (typescript/src/lib.rs:1432).
No efficient API for FFI layers to pass owned byte buffers
The current public API accepts impl Into<EncodedRecord> which resolves to Vec<u8> or String. FFI layers that already have byte data must copy into these types (e.g., Go to_vec() from pinned slice, TypeScript buffer.to_vec() from NAPI Buffer, Java convert_byte_array() from JNI). There is no way to pass a reference-counted Bytes buffer that avoids this copy and also avoids the subsequent landing zone clones.
Proposed Solution
1. Add bytes crate dependency and use Bytes-based internal record types
Create crate-internal record types that use bytes::Bytes instead of Vec<u8>/String:
#[derive(Clone)] // O(1) via Bytes refcount
pub(crate) struct InternalBatch {
pub(crate) records: SmallVec<[Bytes; 1]>,
pub(crate) record_type: RecordType,
}
Convert from existing public types at the ingestion boundary:
Bytes::from(Vec<u8>) — takes ownership of the Vec's allocation (pointer handoff, no memcpy).
Bytes::from(String) — same, via into_bytes().
Bytes::clone() — atomic refcount increment, O(1) regardless of data size.
Use InternalBatch in IngestRequest and throughout the internal pipeline. The landing zone clones and sender task clones become O(1).
All existing public types (EncodedRecord, EncodedBatch, ProtoBytes, JsonString, etc.) and methods remain unchanged.
2. Configure prost to generate Bytes for proto bytes fields
Update build.rs:
tonic_build::configure()
.bytes([
"databricks.zerobus.IngestRecordRequest",
"databricks.zerobus.ProtoEncodedRecordBatch",
"databricks.zerobus.CreateIngestStreamRequest",
])
.compile_protos(&["zerobus_service.proto"], &["."])
.unwrap();
This makes proto bytes fields use bytes::Bytes in the generated code. Proto records then flow from ingestion through prost encoding to gRPC with zero deep copies. The generated types are pub(crate) — invisible to users.
3. New ingest_ipc_batch() on ZerobusArrowStream
Add a method that accepts raw Arrow IPC bytes, bypassing the deserialize→re-serialize round-trip:
impl ZerobusArrowStream {
pub async fn ingest_ipc_batch(&self, ipc_bytes: Bytes) -> ZerobusResult<OffsetId> { ... }
}
Internally, introduce a dual payload type:
#[derive(Clone)]
pub(crate) enum ArrowPayload {
Ipc(Bytes), // From FFI callers. Clone is O(1).
Batch(RecordBatch), // From Rust callers. Clone is O(1) via Arc arrays.
}
For the Ipc variant, construct FlightData directly from the IPC bytes without reconstructing a RecordBatch. The row count for ack matching is extracted from the IPC message header (metadata read, not full deserialization).
The existing ingest_batch(RecordBatch) stays unchanged. Recovery: full replay uses stored Bytes (O(1) clone). Partial-batch slicing (rare) falls back to deserialize → slice → re-serialize.
4. New Bytes-accepting API for proto/JSON
impl ZerobusStream {
pub async fn ingest_proto_bytes(&self, data: Bytes) -> ZerobusResult<OffsetId> { ... }
pub async fn ingest_json_bytes(&self, data: Bytes) -> ZerobusResult<OffsetId> { ... }
pub async fn ingest_proto_bytes_batch(&self, data: Vec<Bytes>) -> ZerobusResult<Option<OffsetId>> { ... }
pub async fn ingest_json_bytes_batch(&self, data: Vec<Bytes>) -> ZerobusResult<Option<OffsetId>> { ... }
}
These skip the Vec<u8> → EncodedRecord → InternalBatch conversion chain entirely. FFI layers can pass Bytes constructed from their native buffer types directly into the zero-copy pipeline.
Expected impact
Proto/JSON hot path:
| Stage |
Before |
After |
LandingZone observe() |
O(n) deep copy |
O(1) refcount |
Sender select! clone |
O(n) deep copy |
O(1) refcount |
| → prost encoding (proto) |
move into Vec<u8> field |
move into Bytes field |
| Total deep copies |
2 per record |
0 |
Arrow hot path:
| Stage |
Before |
After |
| IPC → RecordBatch parse |
full deserialization |
skipped |
| Pending batch clone |
O(1) Arc clone |
O(1) Bytes clone |
| RecordBatch → Flight IPC |
full serialization |
skipped (IPC forwarded) |
| IPC encode/decode passes |
2 |
0 |
Backward compatibility
All changes are non-breaking:
- Existing public types and methods are unchanged.
- New methods (
ingest_ipc_batch, ingest_proto_bytes, ingest_json_bytes) are additive.
- Internal types (
InternalBatch, ArrowPayload, prost config) are pub(crate).
Additional Context
The bytes crate is already a transitive dependency via tonic/hyper/prost. Adding it directly adds no new code to the binary.
Follow-up tasks after this lands:
- TypeScript SDK: Update NAPI bindings to call
ingest_proto_bytes/ingest_json_bytes/ingest_ipc_batch, cache JSON.stringify reference, explore Bytes::from_owner for NAPI Buffer zero-copy.
- Python SDK: Update PyO3 bindings to call
ingest_ipc_batch, resolving the TODO at arrow.rs:272.
- Java SDK: Update JNI bindings to call
ingest_ipc_batch and ingest_proto_bytes.
- Go SDK: Update CGO FFI to call
ingest_ipc_batch and ingest_proto_bytes.
- All SDKs (future): Arrow C Data Interface for true zero-copy Arrow FFI (eliminates IPC serialization on the language side entirely).
SDK
Rust
Description
The Rust SDK's record ingestion pipeline has unnecessary copying overhead that affects all downstream language SDKs (TypeScript, Java, Python, Go). This task covers the Rust SDK changes needed to eliminate that overhead and expose new APIs that FFI layers can adopt in follow-up work.
Proto/JSON path: 2 deep copies per record in the landing zone
Every record is deep-copied twice before reaching gRPC:
LandingZone::observe()(landing_zone.rs:149) clones the entireBox<IngestRequest>(including all record bytes) to keep a copy inobserved_itemsfor retry.tokio::select!(lib.rs:1817) clones the observed item again due to ownership semantics.Vec<u8>(proto) andString(JSON) both perform O(n) deep copies on.clone(). For a 1 MB protobuf record this is 2 MB of unnecessary allocation + memcpy per send.Arrow path: double IPC serialization
All FFI layers serialize Arrow RecordBatches to IPC bytes to cross the language boundary. The Rust SDK deserializes back to
RecordBatch, thenFlightDataEncoderBuilder(arrow_stream.rs:431) re-serializes to IPC for the Flight gRPC protocol:This is documented as a TODO in the Python bindings (
python/rust/src/arrow.rs:272). The same pattern exists in Go (rust/ffi/src/lib.rs:337), Java (rust/jni/src/arrow_stream.rs:107), and TypeScript (typescript/src/lib.rs:1432).No efficient API for FFI layers to pass owned byte buffers
The current public API accepts
impl Into<EncodedRecord>which resolves toVec<u8>orString. FFI layers that already have byte data must copy into these types (e.g., Goto_vec()from pinned slice, TypeScriptbuffer.to_vec()from NAPI Buffer, Javaconvert_byte_array()from JNI). There is no way to pass a reference-countedBytesbuffer that avoids this copy and also avoids the subsequent landing zone clones.Proposed Solution
1. Add
bytescrate dependency and useBytes-based internal record typesCreate crate-internal record types that use
bytes::Bytesinstead ofVec<u8>/String:Convert from existing public types at the ingestion boundary:
Bytes::from(Vec<u8>)— takes ownership of the Vec's allocation (pointer handoff, no memcpy).Bytes::from(String)— same, viainto_bytes().Bytes::clone()— atomic refcount increment, O(1) regardless of data size.Use
InternalBatchinIngestRequestand throughout the internal pipeline. The landing zone clones and sender task clones become O(1).All existing public types (
EncodedRecord,EncodedBatch,ProtoBytes,JsonString, etc.) and methods remain unchanged.2. Configure prost to generate
Bytesfor protobytesfieldsUpdate
build.rs:This makes proto
bytesfields usebytes::Bytesin the generated code. Proto records then flow from ingestion through prost encoding to gRPC with zero deep copies. The generated types arepub(crate)— invisible to users.3. New
ingest_ipc_batch()onZerobusArrowStreamAdd a method that accepts raw Arrow IPC bytes, bypassing the deserialize→re-serialize round-trip:
Internally, introduce a dual payload type:
For the
Ipcvariant, constructFlightDatadirectly from the IPC bytes without reconstructing aRecordBatch. The row count for ack matching is extracted from the IPC message header (metadata read, not full deserialization).The existing
ingest_batch(RecordBatch)stays unchanged. Recovery: full replay uses storedBytes(O(1) clone). Partial-batch slicing (rare) falls back to deserialize → slice → re-serialize.4. New
Bytes-accepting API for proto/JSONThese skip the
Vec<u8>→EncodedRecord→InternalBatchconversion chain entirely. FFI layers can passBytesconstructed from their native buffer types directly into the zero-copy pipeline.Expected impact
Proto/JSON hot path:
observe()select!cloneVec<u8>fieldBytesfieldArrow hot path:
Backward compatibility
All changes are non-breaking:
ingest_ipc_batch,ingest_proto_bytes,ingest_json_bytes) are additive.InternalBatch,ArrowPayload, prost config) arepub(crate).Additional Context
The
bytescrate is already a transitive dependency viatonic/hyper/prost. Adding it directly adds no new code to the binary.Follow-up tasks after this lands:
ingest_proto_bytes/ingest_json_bytes/ingest_ipc_batch, cache JSON.stringify reference, exploreBytes::from_ownerfor NAPI Buffer zero-copy.ingest_ipc_batch, resolving the TODO atarrow.rs:272.ingest_ipc_batchandingest_proto_bytes.ingest_ipc_batchandingest_proto_bytes.