Public Preview: This SDK is supported for production use cases and is available to all customers. Databricks is actively working on stabilizing the Zerobus Ingest SDK for TypeScript. Minor version updates may include backwards-incompatible changes.
We are keen to hear feedback from you on this SDK. Please file issues, and we will address them.
The Databricks Zerobus Ingest SDK for TypeScript provides a high-performance client for ingesting data directly into Databricks Delta tables using the Zerobus streaming protocol. This SDK wraps the high-performance Rust SDK using native bindings for optimal performance. | See also the SDK for Rust | See also the SDK for Python | See also the SDK for Java | See also the SDK for Go
- Features
- Requirements
- Quick Start User Guide
- Usage Examples
- Authentication
- Configuration
- Descriptor Utilities
- Error Handling
- API Reference
- Best Practices
- Platform Support
- Architecture
- Contributing
- Related Projects
- High-throughput ingestion: Optimized for high-volume data ingestion with native Rust implementation
- Automatic recovery: Built-in retry and recovery mechanisms for transient failures
- Flexible configuration: Customizable stream behavior and timeouts
- Multiple serialization formats: Support for JSON and Protocol Buffers
- Type widening: Accept high-level types (plain objects, protobuf messages) or low-level types (strings, buffers) - automatically handles serialization
- Batch ingestion: Ingest multiple records with a single acknowledgment for higher throughput
- OAuth 2.0 authentication: Secure authentication with client credentials
- TypeScript support: Full type definitions for excellent IDE support
- Cross-platform: Supports Linux, macOS, and Windows
- Node.js: >= 16
- Databricks workspace with Zerobus access enabled
- Rust toolchain: 1.70 or higher - Install Rust
- Cargo: Included with Rust
These will be installed automatically:
{
"@napi-rs/cli": "^2.18.4",
"napi-build": "^0.3.3"
}Before using the SDK, you'll need the following:
After logging into your Databricks workspace, look at the browser URL:
https://<databricks-instance>.cloud.databricks.com/?o=<workspace-id>
- Workspace URL: The part before
/?o=→https://<databricks-instance>.cloud.databricks.com - Workspace ID: The part after
?o=→<workspace-id> - Zerobus Endpoint:
https://<workspace-id>.zerobus.<region>.cloud.databricks.com
Note: The examples above show AWS endpoints (
.cloud.databricks.com). For Azure deployments, the workspace URL will behttps://<databricks-instance>.azuredatabricks.netand Zerobus endpoint will use.azuredatabricks.net.
Example:
- Full URL:
https://dbc-a1b2c3d4-e5f6.cloud.databricks.com/?o=1234567890123456 - Workspace URL:
https://dbc-a1b2c3d4-e5f6.cloud.databricks.com - Workspace ID:
1234567890123456 - Zerobus Endpoint:
https://1234567890123456.zerobus.us-west-2.cloud.databricks.com
Create a table using Databricks SQL:
CREATE TABLE <catalog_name>.default.air_quality (
device_name STRING,
temp INT,
humidity BIGINT
)
USING DELTA;Replace <catalog_name> with your catalog name (e.g., main).
- Navigate to Settings > Identity and Access in your Databricks workspace
- Click Service principals and create a new service principal
- Generate a new secret for the service principal and save it securely
- Grant the following permissions:
USE_CATALOGon the catalog (e.g.,main)USE_SCHEMAon the schema (e.g.,default)MODIFYandSELECTon the table (e.g.,air_quality)
Grant permissions using SQL:
-- Grant catalog permission
GRANT USE CATALOG ON CATALOG <catalog_name> TO `<service-principal-application-id>`;
-- Grant schema permission
GRANT USE SCHEMA ON SCHEMA <catalog_name>.default TO `<service-principal-application-id>`;
-- Grant table permissions
GRANT SELECT, MODIFY ON TABLE <catalog_name>.default.air_quality TO `<service-principal-application-id>`;Before installing the SDK, ensure you have the required tools:
1. Node.js >= 16
Check if Node.js is installed:
node --versionIf not installed, download from nodejs.org.
2. Rust Toolchain (1.70+)
The SDK requires Rust to compile the native addon. Install using rustup (the official Rust installer):
On Linux and macOS:
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | shFollow the prompts (typically just press Enter to accept defaults).
On Windows:
Download and run the installer from rustup.rs, or use:
# Using winget
winget install Rustlang.Rustup
# Or download from https://rustup.rs/Verify Installation:
rustc --version
cargo --versionYou should see version 1.70 or higher. If the commands aren't found, restart your terminal or add Rust to your PATH:
# Linux/macOS
source $HOME/.cargo/env
# Windows (PowerShell)
# Restart your terminalAdditional Platform Requirements:
-
Linux: Build essentials
# Ubuntu/Debian sudo apt-get install build-essential # CentOS/RHEL sudo yum groupinstall "Development Tools"
-
macOS: Xcode Command Line Tools
xcode-select --install
-
Windows: Visual Studio Build Tools
- Install Visual Studio Build Tools
- During installation, select "Desktop development with C++"
Note for macOS users: Pre-built binaries are not available. The package will automatically build from source during npm install. Ensure you have Rust toolchain and Xcode Command Line Tools installed (see prerequisites above).
-
Extract the SDK package:
unzip zerobus-sdk-ts.zip cd zerobus-sdk-ts -
Install dependencies:
npm install
-
Build the native addon:
npm run build
This will compile the Rust code into a native Node.js addon (
.nodefile) for your platform. -
Verify the build:
# You should see a .node file ls -la *.node
-
The SDK is now ready to use! You can:
- Use it directly in this directory for examples
- Link it globally:
npm link - Or copy it into your project's
node_modules
Troubleshooting:
- "rustc: command not found": Restart your terminal after installing Rust
- Build fails on Windows: Ensure Visual Studio Build Tools are installed with C++ support
- Build fails on Linux: Install build-essential or equivalent package
- Permission errors: Don't use
sudowith npm/cargo commands
The SDK supports two serialization formats. Protocol Buffers is the default and recommended for production use:
- Protocol Buffers (Default) - Strongly-typed schemas, efficient binary encoding, better performance. This is the default format.
- JSON - Simple, no schema compilation needed. Good for getting started quickly or when schema flexibility is needed.
Note: If you don't specify
recordType, the SDK will use Protocol Buffers by default. To use JSON, explicitly setrecordType: RecordType.Json.
JSON mode is the simplest way to get started. You don't need to define or compile protobuf schemas, but you must explicitly specify RecordType.Json.
import { ZerobusSdk, RecordType } from '@databricks/zerobus-ingest-sdk';
// Configuration
// For AWS:
const zerobusEndpoint = '<workspace-id>.zerobus.<region>.cloud.databricks.com';
const workspaceUrl = 'https://<workspace-name>.cloud.databricks.com';
// For Azure:
// const zerobusEndpoint = '<workspace-id>.zerobus.<region>.azuredatabricks.net';
// const workspaceUrl = 'https://<workspace-name>.azuredatabricks.net';
const tableName = 'main.default.air_quality';
const clientId = process.env.DATABRICKS_CLIENT_ID!;
const clientSecret = process.env.DATABRICKS_CLIENT_SECRET!;
// Initialize SDK
const sdk = new ZerobusSdk(zerobusEndpoint, workspaceUrl);
// Configure table properties (no descriptor needed for JSON)
const tableProperties = { tableName };
// Configure stream with JSON record type
const options = {
recordType: RecordType.Json, // JSON encoding
maxInflightRequests: 1000,
recovery: true
};
// Create stream
const stream = await sdk.createStream(
tableProperties,
clientId,
clientSecret,
options
);
try {
let lastAckPromise;
// Send all records
for (let i = 0; i < 100; i++) {
// Create JSON record
const record = {
device_name: `sensor-${i % 10}`,
temp: 20 + (i % 15),
humidity: 50 + (i % 40)
};
// JSON supports 2 types:
// 1. object (high-level) - SDK auto-stringifies
lastAckPromise = stream.ingestRecord(record);
// 2. string (low-level) - pre-serialized JSON
// lastAckPromise = stream.ingestRecord(JSON.stringify(record));
}
console.log('All records sent. Waiting for last acknowledgment...');
// Wait for the last record's acknowledgment
const lastOffset = await lastAckPromise;
console.log(`Last record offset: ${lastOffset}`);
// Flush to ensure all records are acknowledged
await stream.flush();
console.log('Successfully ingested 100 records!');
} finally {
// Always close the stream
await stream.close();
}Protocol Buffers is the default serialization format and provides efficient binary encoding with schema validation. This is recommended for production use. This section covers the complete setup process.
Before starting, ensure you have:
- Protocol Buffer Compiler (
protoc) - Required for generating descriptor files - protobufjs and protobufjs-cli - Already included in package.json devDependencies
Linux:
# Ubuntu/Debian
sudo apt-get update && sudo apt-get install -y protobuf-compiler
# CentOS/RHEL
sudo yum install -y protobuf-compiler
# Alpine
apk add protobufmacOS:
brew install protobufWindows:
# Using Chocolatey
choco install protoc
# Or download from: https://github.com/protocolbuffers/protobuf/releasesVerify Installation:
protoc --version
# Should show: libprotoc 3.x.x or higherThe SDK includes an example schema at schemas/air_quality.proto:
syntax = "proto2";
package examples;
// Example message representing air quality sensor data
message AirQuality {
optional string device_name = 1;
optional int32 temp = 2;
optional int64 humidity = 3;
}Generate TypeScript code from your proto schema:
npm run build:protoThis runs:
pbjs -t static-module -w commonjs -o examples/generated/air_quality.js schemas/air_quality.proto
pbts -o examples/generated/air_quality.d.ts examples/generated/air_quality.jsOutput:
examples/generated/air_quality.js- JavaScript protobuf codeexamples/generated/air_quality.d.ts- TypeScript type definitions
Databricks requires descriptor metadata about your protobuf schema.
Generate Binary Descriptor:
protoc --descriptor_set_out=schemas/air_quality_descriptor.pb \
--include_imports \
schemas/air_quality.protoImportant flags:
--descriptor_set_out- Output path for the binary descriptor--include_imports- Include all imported proto files (required)
That's it! The SDK will automatically extract the message descriptor from this file.
import { ZerobusSdk, RecordType } from '@databricks/zerobus-ingest-sdk';
import * as airQuality from './examples/generated/air_quality';
import { loadDescriptorProto } from '@databricks/zerobus-ingest-sdk/utils/descriptor';
// Configuration
const zerobusEndpoint = '<workspace-id>.zerobus.<region>.cloud.databricks.com';
const workspaceUrl = 'https://<workspace-name>.cloud.databricks.com';
const tableName = 'main.default.air_quality';
const clientId = process.env.DATABRICKS_CLIENT_ID!;
const clientSecret = process.env.DATABRICKS_CLIENT_SECRET!;
// Load and extract the descriptor for your specific message
const descriptorBase64 = loadDescriptorProto({
descriptorPath: 'schemas/air_quality_descriptor.pb',
protoFileName: 'air_quality.proto',
messageName: 'AirQuality'
});
// Initialize SDK
const sdk = new ZerobusSdk(zerobusEndpoint, workspaceUrl);
// Configure table properties with protobuf descriptor
const tableProperties = {
tableName,
descriptorProto: descriptorBase64 // Required for Protocol Buffers
};
// Configure stream with Protocol Buffers record type
const options = {
recordType: RecordType.Proto, // Protocol Buffers encoding
maxInflightRequests: 1000,
recovery: true
};
// Create stream
const stream = await sdk.createStream(tableProperties, clientId, clientSecret, options);
try {
const AirQuality = airQuality.examples.AirQuality;
let lastAckPromise;
// Send all records
for (let i = 0; i < 100; i++) {
const record = AirQuality.create({
device_name: `sensor-${i}`,
temp: 20 + i,
humidity: 50 + i
});
// Protobuf supports 2 types:
// 1. Message object (high-level) - SDK calls .encode().finish()
lastAckPromise = stream.ingestRecord(record);
// 2. Buffer (low-level) - pre-serialized bytes
// const buffer = Buffer.from(AirQuality.encode(record).finish());
// lastAckPromise = stream.ingestRecord(buffer);
}
console.log('All records sent. Waiting for last acknowledgment...');
// Wait for the last record's acknowledgment
const lastOffset = await lastAckPromise;
console.log(`Last record offset: ${lastOffset}`);
// Flush to ensure all records are acknowledged
await stream.flush();
console.log('Successfully ingested 100 records!');
} finally {
await stream.close();
}When creating your proto schema, use these type mappings:
| Delta Type | Proto2 Type | Notes |
|---|---|---|
| STRING, VARCHAR | string | |
| INT, SMALLINT, SHORT | int32 | |
| BIGINT, LONG | int64 | |
| FLOAT | float | |
| DOUBLE | double | |
| BOOLEAN | bool | |
| BINARY | bytes | |
| DATE | int32 | Days since epoch |
| TIMESTAMP | int64 | Microseconds since epoch |
| ARRAY<type> | repeated type | Use repeated field |
| MAP<key, value> | map<key, value> | Use map field |
| STRUCT<fields> | message | Define nested message |
Example: Complex Schema
syntax = "proto2";
package examples;
message ComplexRecord {
optional string id = 1;
optional int64 timestamp = 2;
repeated string tags = 3; // ARRAY<STRING>
map<string, int32> metrics = 4; // MAP<STRING, INT>
optional NestedData nested = 5; // STRUCT
}
message NestedData {
optional string field1 = 1;
optional double field2 = 2;
}-
Create your proto file:
cat > schemas/my_schema.proto << 'EOF' syntax = "proto2"; package my_schema; message MyMessage { optional string field1 = 1; optional int32 field2 = 2; } EOF
-
Add build script to package.json:
{ "scripts": { "build:proto:myschema": "pbjs -t static-module -w commonjs -o examples/generated/my_schema.js schemas/my_schema.proto && pbts -o examples/generated/my_schema.d.ts examples/generated/my_schema.js" } } -
Generate code and descriptor:
npm run build:proto:myschema protoc --descriptor_set_out=schemas/my_schema_descriptor.pb --include_imports schemas/my_schema.proto
-
Load descriptor in your code:
import { loadDescriptorProto } from '@databricks/zerobus-ingest-sdk/utils/descriptor'; const descriptorBase64 = loadDescriptorProto({ descriptorPath: 'schemas/my_schema_descriptor.pb', protoFileName: 'my_schema.proto', messageName: 'MyMessage' });
"protoc: command not found"
- Install
protoc(see Step 1 above)
"Cannot find module './generated/air_quality'"
- Run
npm run build:prototo generate TypeScript code
"Descriptor file not found"
- Generate the descriptor file using the commands in Step 4
"Invalid descriptor"
- Ensure you used
--include_importsflag when generating the descriptor - Verify the
.pbfile was created:ls -lh schemas/*.pb - Check that
protoFileNameandmessageNamematch your proto file - Make sure you're using
loadDescriptorProto()from the utils
Build fails on proto generation
- Ensure protobufjs is installed:
npm install --save-dev protobufjs protobufjs-cli
Complete setup from scratch:
# Install dependencies and build SDK
npm install
npm run build
# Setup Protocol Buffers
npm run build:proto
protoc --descriptor_set_out=schemas/air_quality_descriptor.pb --include_imports schemas/air_quality.proto
# Run example
npx tsx examples/proto.ts-
TypeScript Code Generation (
npm run build:proto):- Creates JavaScript/TypeScript code for your application
- Provides type-safe message creation and encoding
- Used in your application code
-
Descriptor File Generation (
protoc --descriptor_set_out):- Creates metadata about your schema for Databricks
- Required by Zerobus service for schema validation
- Uploaded as base64 string when creating a stream
Both are necessary for Protocol Buffers ingestion!
See the examples/ directory for complete, runnable examples. See examples/README.md for detailed instructions.
# Set environment variables
export ZEROBUS_SERVER_ENDPOINT="<workspace-id>.zerobus.<region>.cloud.databricks.com"
export DATABRICKS_WORKSPACE_URL="https://<workspace-name>.cloud.databricks.com"
export DATABRICKS_CLIENT_ID="your-client-id"
export DATABRICKS_CLIENT_SECRET="your-client-secret"
export ZEROBUS_TABLE_NAME="main.default.air_quality"
# Run JSON example
npx tsx examples/json.ts
# For Protocol Buffers, generate TypeScript code and descriptor
npm run build:proto
protoc --descriptor_set_out=schemas/air_quality_descriptor.pb --include_imports schemas/air_quality.proto
# Run Protocol Buffers example
npx tsx examples/proto.tsFor higher throughput, use batch ingestion to send multiple records with a single acknowledgment:
const records = Array.from({ length: 1000 }, (_, i) =>
AirQuality.create({ device_name: `sensor-${i}`, temp: 20 + i, humidity: 50 + i })
);
// Protobuf Type 1: Message objects (high-level) - SDK auto-serializes
const offsetId = await stream.ingestRecords(records);
// Protobuf Type 2: Buffers (low-level) - pre-serialized bytes
// const buffers = records.map(r => Buffer.from(AirQuality.encode(r).finish()));
// const offsetId = await stream.ingestRecords(buffers);
if (offsetId !== null) {
console.log(`Batch acknowledged at offset ${offsetId}`);
}const records = Array.from({ length: 1000 }, (_, i) => ({
device_name: `sensor-${i}`,
temp: 20 + i,
humidity: 50 + i
}));
// JSON Type 1: objects (high-level) - SDK auto-stringifies
const offsetId = await stream.ingestRecords(records);
// JSON Type 2: strings (low-level) - pre-serialized JSON
// const jsonRecords = records.map(r => JSON.stringify(r));
// const offsetId = await stream.ingestRecords(jsonRecords);Type Widening Support:
- JSON mode: Accept
object[](auto-stringify) orstring[](pre-stringified) - Proto mode: Accept protobuf messages with
.encode()method (auto-serialize) orBuffer[](pre-serialized) - Mixed types are supported in the same batch
Best Practices:
- Batch size: 100-1,000 records for optimal throughput/latency balance
- Empty batches return
null(no error, no offset) - Use
recreateStream()for recovery - it automatically handles unacknowledged batches
Examples:
Both json.ts and proto.ts examples demonstrate batch ingestion.
The SDK uses OAuth 2.0 Client Credentials for authentication:
import { ZerobusSdk } from '@databricks/zerobus-ingest-sdk';
const sdk = new ZerobusSdk(zerobusEndpoint, workspaceUrl);
// Create stream with OAuth authentication
const stream = await sdk.createStream(
tableProperties,
clientId,
clientSecret,
options
);The SDK automatically fetches access tokens and includes these headers:
"authorization": "Bearer <oauth_token>"- Obtained via OAuth 2.0 Client Credentials flow"x-databricks-zerobus-table-name": "<table_name>"- The fully qualified table name
Beyond OAuth, you can use custom headers for Personal Access Tokens (PAT) or other auth methods:
import { ZerobusSdk } from '@databricks/zerobus-ingest-sdk';
import { HeadersProvider } from '@databricks/zerobus-ingest-sdk/src/headers_provider';
class CustomHeadersProvider implements HeadersProvider {
async getHeaders(): Promise<Array<[string, string]>> {
return [
["authorization", `Bearer ${myToken}`],
["x-databricks-zerobus-table-name", tableName]
];
}
}
const headersProvider = new CustomHeadersProvider();
const stream = await sdk.createStream(
tableProperties,
'', // client_id (ignored when headers_provider is provided)
'', // client_secret (ignored when headers_provider is provided)
options,
{ getHeadersCallback: headersProvider.getHeaders.bind(headersProvider) }
);Note: Custom authentication is integrated into the main createStream() method. See the API Reference for details.
| Option | Default | Description |
|---|---|---|
recordType |
RecordType.Proto |
Serialization format: RecordType.Json or RecordType.Proto |
maxInflightRequests |
10,000 | Maximum number of unacknowledged requests |
recovery |
true | Enable automatic stream recovery |
recoveryTimeoutMs |
15,000 | Timeout for recovery operations (ms) |
recoveryBackoffMs |
2,000 | Delay between recovery attempts (ms) |
recoveryRetries |
4 | Maximum number of recovery attempts |
flushTimeoutMs |
300,000 | Timeout for flush operations (ms) |
serverLackOfAckTimeoutMs |
60,000 | Server acknowledgment timeout (ms) |
import { StreamConfigurationOptions, RecordType } from '@databricks/zerobus-ingest-sdk';
const options: StreamConfigurationOptions = {
recordType: RecordType.Json, // JSON encoding
maxInflightRequests: 10000,
recovery: true,
recoveryTimeoutMs: 20000,
recoveryBackoffMs: 2000,
recoveryRetries: 4
};
const stream = await sdk.createStream(
tableProperties,
clientId,
clientSecret,
options
);The SDK provides a helper function to extract Protocol Buffer descriptors from FileDescriptorSets.
Extracts a specific message descriptor from a FileDescriptorSet:
import { loadDescriptorProto } from '@databricks/zerobus-ingest-sdk/utils/descriptor';
const descriptorBase64 = loadDescriptorProto({
descriptorPath: 'schemas/my_schema_descriptor.pb',
protoFileName: 'my_schema.proto', // Name of your .proto file
messageName: 'MyMessage' // The specific message to use
});Parameters:
descriptorPath: Path to the.pbfile generated byprotoc --descriptor_set_outprotoFileName: Name of the proto file (e.g.,"air_quality.proto")messageName: Name of the message type to extract (e.g.,"AirQuality")
Why use this utility?
- Extracts the specific message descriptor you need
- No manual base64 conversion required
- Clear error messages if the file or message isn't found
- Flexible for complex schemas with multiple messages or imports
Example with multiple messages:
// Your proto file has: Order, OrderItem, Customer
// You want to ingest Orders:
const descriptorBase64 = loadDescriptorProto({
descriptorPath: 'schemas/orders_descriptor.pb',
protoFileName: 'orders.proto',
messageName: 'Order' // Explicitly choose Order
});The SDK includes automatic recovery for transient failures (enabled by default with recovery: true). For permanent failures, use recreateStream() to automatically recover all unacknowledged batches. Always use try/finally blocks to ensure streams are properly closed:
try {
const offset = await stream.ingestRecord(JSON.stringify(record));
console.log(`Success: offset ${offset}`);
} catch (error) {
console.error('Ingestion failed:', error);
// When stream fails, close it first
await stream.close();
console.log('Stream closed after error');
// Optional: Inspect what needs recovery (must be called on closed stream)
const unackedBatches = await stream.getUnackedBatches();
console.log(`Batches to recover: ${unackedBatches.length}`);
// Recommended recovery approach: Use recreateStream()
// This method:
// 1. Gets all unacknowledged batches from the failed stream
// 2. Creates a new stream with the same configuration
// 3. Re-ingests all unacknowledged batches automatically
// 4. Returns the new stream ready for continued use
const newStream = await sdk.recreateStream(stream);
console.log(`Stream recreated with ${unackedBatches.length} batches re-ingested`);
// Continue using newStream for further ingestion
try {
// Continue ingesting...
} finally {
await newStream.close();
}
}Best Practices:
- Rely on automatic recovery (default): The SDK will automatically retry transient failures
- Use
recreateStream()for permanent failures: Automatically recovers all unacknowledged batches - Use
getUnackedRecords()for inspection only: Primarily for debugging or understanding failed records - Always close streams in a
finallyblock to ensure proper cleanup
Main entry point for the SDK.
Constructor:
new ZerobusSdk(zerobusEndpoint: string, unityCatalogUrl: string)Parameters:
zerobusEndpoint(string) - The Zerobus gRPC endpoint (e.g.,<workspace-id>.zerobus.<region>.cloud.databricks.comfor AWS, or<workspace-id>.zerobus.<region>.azuredatabricks.netfor Azure)unityCatalogUrl(string) - The Unity Catalog endpoint (your workspace URL)
Methods:
async createStream(
tableProperties: TableProperties,
clientId: string,
clientSecret: string,
options?: StreamConfigurationOptions
): Promise<ZerobusStream>Creates a new ingestion stream using OAuth 2.0 Client Credentials authentication.
Automatically includes these headers:
"authorization": "Bearer <oauth_token>"(fetched via OAuth 2.0 Client Credentials flow)"x-databricks-zerobus-table-name": "<table_name>"
Returns a ZerobusStream instance.
async recreateStream(stream: ZerobusStream): Promise<ZerobusStream>Recreates a stream with the same configuration and automatically re-ingests all unacknowledged batches.
This method is the recommended approach for recovering from stream failures. It:
- Retrieves all unacknowledged batches from the failed stream
- Creates a new stream with identical configuration (same table, auth, options)
- Re-ingests all unacknowledged batches in their original order
- Returns the new stream ready for continued ingestion
Parameters:
stream- The failed or closed stream to recreate
Returns: Promise resolving to a new ZerobusStream with all unacknowledged batches re-ingested
Example:
try {
await stream.ingestRecords(batch);
} catch (error) {
await stream.close();
// Automatically recreate stream and recover all unacked batches
const newStream = await sdk.recreateStream(stream);
// Continue ingesting with newStream
}Note: This method preserves batch structure and re-ingests batches atomically. For debugging, you can inspect what was recovered using getUnackedBatches() after closing the stream.
Represents an active ingestion stream.
Methods:
async ingestRecord(payload: Buffer | string | object): Promise<bigint>Ingests a single record. This method blocks until the record is sent to the SDK's internal landing zone, then returns a Promise for the server acknowledgment. This allows you to send many records without waiting for individual acknowledgments.
Parameters:
payload- Record data. The SDK supports 4 input types for flexibility:- JSON Mode (
RecordType.Json):- Type 1 - object (high-level): Plain JavaScript object - SDK auto-stringifies with
JSON.stringify() - Type 2 - string (low-level): Pre-serialized JSON string
- Type 1 - object (high-level): Plain JavaScript object - SDK auto-stringifies with
- Protocol Buffers Mode (
RecordType.Proto):- Type 3 - Message (high-level): Protobuf message object - SDK calls
.encode().finish()automatically - Type 4 - Buffer (low-level): Pre-serialized protobuf bytes
- Type 3 - Message (high-level): Protobuf message object - SDK calls
- JSON Mode (
All 4 Type Examples:
// JSON Type 1: object (high-level) - SDK auto-stringifies
await stream.ingestRecord({ device: 'sensor-1', temp: 25 });
// JSON Type 2: string (low-level) - pre-serialized
await stream.ingestRecord(JSON.stringify({ device: 'sensor-1', temp: 25 }));
// Protobuf Type 3: Message object (high-level) - SDK auto-serializes
const message = MyMessage.create({ device: 'sensor-1', temp: 25 });
await stream.ingestRecord(message);
// Protobuf Type 4: Buffer (low-level) - pre-serialized bytes
const buffer = Buffer.from(MyMessage.encode(message).finish());
await stream.ingestRecord(buffer);Note: The SDK automatically detects protobufjs message objects by checking if the constructor has a static .encode() method. This works seamlessly with messages created via MyMessage.create() or new MyMessage().
Returns: Promise resolving to the offset ID when the server acknowledges the record
async ingestRecords(payloads: Array<Buffer | string | object>): Promise<bigint | null>Ingests multiple records as a batch. All records in a batch are acknowledged together atomically. This method blocks until all records are sent to the SDK's internal landing zone, then returns a Promise for the server acknowledgment.
Parameters:
payloads- Array of record data. Supports the same 4 types asingestRecord():- JSON Mode: Array of objects (Type 1) or strings (Type 2)
- Proto Mode: Array of Message objects (Type 3) or Buffers (Type 4)
- Mixed types within the same array are supported
All 4 Type Examples:
// JSON Type 1: objects (high-level) - SDK auto-stringifies
await stream.ingestRecords([
{ device: 'sensor-1', temp: 25 },
{ device: 'sensor-2', temp: 26 }
]);
// JSON Type 2: strings (low-level) - pre-serialized
await stream.ingestRecords([
JSON.stringify({ device: 'sensor-1', temp: 25 }),
JSON.stringify({ device: 'sensor-2', temp: 26 })
]);
// Protobuf Type 3: Message objects (high-level) - SDK auto-serializes
await stream.ingestRecords([
MyMessage.create({ device: 'sensor-1', temp: 25 }),
MyMessage.create({ device: 'sensor-2', temp: 26 })
]);
// Protobuf Type 4: Buffers (low-level) - pre-serialized bytes
const buffers = [
Buffer.from(MyMessage.encode(msg1).finish()),
Buffer.from(MyMessage.encode(msg2).finish())
];
await stream.ingestRecords(buffers);Returns: Promise resolving to:
bigint- Offset ID when the server acknowledges the entire batchnull- If the batch was empty (no records sent)
Best Practices:
- Batch size: 100-1,000 records for optimal throughput/latency balance
- Empty batches are allowed and return
null
async flush(): Promise<void>Flushes all pending records and waits for acknowledgments.
async close(): Promise<void>Closes the stream gracefully, flushing all pending data. Always call this in a finally block!
async getUnackedRecords(): Promise<Buffer[]>Returns unacknowledged record payloads as a flat array for inspection purposes.
Important: Can only be called on closed streams. Call stream.close() first, or this will throw an error.
Returns: Array of Buffer containing the raw record payloads
Use case: For inspecting unacknowledged individual records when using ingestRecord(). Note: This method is primarily for debugging and inspection. For recovery, use recreateStream() (recommended) or automatic recovery (default).
async getUnackedBatches(): Promise<Buffer[][]>Returns unacknowledged records grouped by their original batches for inspection purposes.
Important: Can only be called on closed streams. Call stream.close() first, or this will throw an error.
Returns: Array of arrays, where each inner array represents a batch of records as Buffers
Use case: For inspecting unacknowledged batches when using ingestRecords(). Preserves the original batch structure. Note: This method is primarily for debugging and inspection. For recovery, use recreateStream() (recommended) or automatic recovery (default).
Example:
try {
await stream.ingestRecords(batch1);
await stream.ingestRecords(batch2);
// ... error occurs
} catch (error) {
await stream.close();
const unackedBatches = await stream.getUnackedBatches();
// unackedBatches[0] contains records from batch1 (if not acked)
// unackedBatches[1] contains records from batch2 (if not acked)
// Re-ingest with new stream
for (const batch of unackedBatches) {
await newStream.ingestRecords(batch);
}
}Configuration for the target table.
Interface:
interface TableProperties {
tableName: string; // Fully qualified table name (e.g., "catalog.schema.table")
descriptorProto?: string; // Base64-encoded protobuf descriptor (required for Protocol Buffers)
}Examples:
// JSON mode
const tableProperties = { tableName: 'main.default.air_quality' };
// Protocol Buffers mode
const tableProperties = {
tableName: 'main.default.air_quality',
descriptorProto: descriptorBase64 // Required for protobuf
};Configuration options for stream behavior.
Interface:
interface StreamConfigurationOptions {
recordType?: RecordType; // RecordType.Json or RecordType.Proto. Default: RecordType.Proto
maxInflightRequests?: number; // Default: 10,000
recovery?: boolean; // Default: true
recoveryTimeoutMs?: number; // Default: 15,000
recoveryBackoffMs?: number; // Default: 2,000
recoveryRetries?: number; // Default: 4
flushTimeoutMs?: number; // Default: 300,000
serverLackOfAckTimeoutMs?: number; // Default: 60,000
}
enum RecordType {
Json = 0, // JSON encoding
Proto = 1 // Protocol Buffers encoding
}- Reuse SDK instances: Create one
ZerobusSdkinstance per application - Stream lifecycle: Always close streams in a
finallyblock to ensure all records are flushed - Batch size: Adjust
maxInflightRequestsbased on your throughput requirements (default: 10,000) - Error handling: The stream handles errors internally with automatic retry. Only use
recreateStream()for persistent failures after internal retries are exhausted. - Use Protocol Buffers for production: Protocol Buffers (the default) provides better performance and schema validation. Use JSON only when you need schema flexibility or for quick prototyping.
- Store credentials securely: Use environment variables, never hardcode credentials
- Use batch ingestion: For high-throughput scenarios, use
ingestRecords()instead of individualingestRecord()calls
The SDK supports all platforms where Node.js and Rust are available.
Pre-built native binaries are available for:
- Linux: x64, ARM64
- Windows: x64
macOS users: Pre-built binaries are not available for macOS. The package will automatically build from source during npm install, which requires:
- Rust toolchain (1.70+): Install via
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh - Xcode Command Line Tools: Install via
xcode-select --install
The build process happens automatically during installation and typically takes 2-3 minutes.
This SDK wraps the high-performance Rust Zerobus SDK using NAPI-RS:
┌─────────────────────────────┐
│ TypeScript Application │
└─────────────┬───────────────┘
│ (NAPI-RS bindings)
┌─────────────▼───────────────┐
│ Rust Zerobus SDK │
│ - gRPC communication │
│ - OAuth authentication │
│ - Stream management │
└─────────────┬───────────────┘
│ (gRPC/TLS)
┌─────────────▼───────────────┐
│ Databricks Zerobus Service│
└─────────────────────────────┘
Benefits:
- Zero-copy data transfer between JavaScript and Rust
- Native async/await support - Rust futures become JavaScript Promises
- Automatic memory management - No manual cleanup required
- Type safety - Compile-time checks on both sides
We welcome contributions! Please see CONTRIBUTING.md for details.
- Zerobus Rust SDK - The underlying Rust implementation
- Zerobus Python SDK - Python SDK for Zerobus
- Zerobus Java SDK - Java SDK for Zerobus
- NAPI-RS - Rust/Node.js binding framework