Skip to content

databricks/zerobus-sdk-go

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

9 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Zerobus Go SDK

A high-performance Go client for streaming data ingestion into Databricks Delta tables using the Zerobus service.

Disclaimer

Public Preview: This SDK is supported for production use cases and is available to all customers. Databricks is actively working on stabilizing the Zerobus Ingest SDK for Go. Minor version updates may include backwards-incompatible changes.

We are keen to hear feedback from you on this SDK. Please file issues, and we will address them.

Table of Contents

Overview

The Zerobus Go SDK provides a robust, CGO-based wrapper around the high-performance Rust implementation for ingesting large volumes of data into Databricks Delta tables. It abstracts the complexity of the Zerobus service and handles authentication, retries, stream recovery, and acknowledgment tracking automatically.

What is Zerobus? Zerobus is a high-throughput streaming service for direct data ingestion into Databricks Delta tables, optimized for real-time data pipelines and high-volume workloads.

This SDK wraps the Rust zerobus-sdk-rs using CGO and FFI (Foreign Function Interface), providing an idiomatic Go API while leveraging the performance and reliability of the underlying Rust implementation.

Features

  • Static Linking - Self-contained binaries with no runtime dependencies or LD_LIBRARY_PATH configuration
  • High-throughput streaming ingestion into Databricks Delta tables
  • Automatic OAuth 2.0 authentication with Unity Catalog
  • Simple JSON ingestion - No code generation required for basic use cases
  • Protocol Buffers support for type-safe, efficient data encoding
  • Backpressure control to manage memory usage
  • Automatic retry and recovery for transient failures
  • Configurable timeouts and retry policies
  • Async acknowledgments for ingested records
  • Graceful stream management - Proper flushing and acknowledgment tracking

Architecture

┌─────────────────┐
│   Go SDK        │  ← Idiomatic Go API (this package)
│   (zerobus)     │
└────────┬────────┘
         │ CGO (Static Linking)
┌────────▼────────┐
│  Rust FFI Crate │  ← C-compatible wrapper
│ (zerobus-ffi)   │
└────────┬────────┘
         │
┌────────▼────────┐
│  Rust SDK       │  ← Core async implementation (Tokio)
│(zerobus-sdk-rs) │     - gRPC bidirectional streaming
└────────┬────────┘     - OAuth 2.0 authentication
         │              - Automatic recovery
         ▼
   Databricks
 Zerobus Service

Getting Started

Choose your installation path:

Path When to Use
For SDK Users You want to use the SDK in your project (via go get)
For Contributors You want to contribute or build from source (via git clone)

For SDK Users (Install from pkg.go.dev)

Prerequisites:

System Requirements:

  • Go 1.21+
  • CGO enabled (enabled by default)
  • Rust 1.75+ (install from rustup.rs)
  • C compiler (gcc on Linux, clang on macOS, MinGW-w64 on Windows)

Supported Platforms:

  • Linux (x86_64, ARM64)
  • macOS (Intel, Apple Silicon)
  • Windows (x86_64 with MinGW-w64)

Databricks Requirements:

  • Databricks workspace with Zerobus access enabled (AWS, Azure, or GCP)
  • OAuth 2.0 client credentials (client ID and secret)
  • Unity Catalog endpoint access

Installation:

# 1. Add the SDK to your project
go get github.com/databricks/zerobus-go-sdk

# 2. Build the Rust FFI library (one-time setup, takes 2-5 minutes)
go generate github.com/databricks/zerobus-go-sdk/sdk

# 3. Build your project normally
go build

In your code:

import zerobus "github.com/databricks/zerobus-go-sdk/sdk"

func main() {
    sdk, err := zerobus.NewZerobusSdk(
        "https://your-shard.zerobus.region.cloud.databricks.com",
        "https://your-workspace.cloud.databricks.com",
    )
    if err != nil {
        log.Fatal(err)
    }
    defer sdk.Free()
    // ... create streams and ingest data
}

Note: After the initial go generate step, regular go build works normally. The Rust library is statically linked into your binary.

For Contributors (Build from Source)

Prerequisites:

  • Same as above (Go, CGO, Rust, C compiler, Databricks workspace)
  • Git

Setup:

# Clone the repository
git clone https://github.com/databricks/zerobus-go-sdk.git
cd zerobus-go-sdk/sdk
go generate  # Builds Rust FFI
cd ..
make build   # Builds everything

See Building from Source for more build options and Community and Contributing for contribution guidelines.

Quick Start

Once you've completed the Getting Started steps, here's how to ingest data:

JSON Ingestion (Simplest Approach)

Perfect for getting started quickly without code generation:

package main

import (
    "log"
    zerobus "github.com/databricks/zerobus-go-sdk/sdk"
)

func main() {
    // 1. Initialize SDK
    sdk, err := zerobus.NewZerobusSdk(
        "https://your-shard-id.zerobus.region.cloud.databricks.com",
        "https://your-workspace.cloud.databricks.com",
    )
    if err != nil {
        log.Fatal(err)
    }
    defer sdk.Free()

    // 2. Configure for JSON records
    options := zerobus.DefaultStreamConfigurationOptions()
    options.RecordType = zerobus.RecordTypeJson

    // 3. Create stream
    stream, err := sdk.CreateStream(
        zerobus.TableProperties{
            TableName: "catalog.schema.table",
        },
        "your-client-id",
        "your-client-secret",
        options,
    )
    if err != nil {
        log.Fatal(err)
    }
    defer stream.Close()

    // 4. Ingest record (blocks until queued, returns ack handle)
    ack, err := stream.IngestRecord(`{"id": 1, "message": "Hello"}`)
    if err != nil {
        log.Fatal(err)
    }

    // 5. Await acknowledgment to get offset
    offset, err := ack.Await()
    if err != nil {
        log.Fatal(err)
    }
    log.Printf("Ingested record at offset %d", offset)

    // 6. Flush to ensure durability
    if err := stream.Flush(); err != nil {
        log.Fatal(err)
    }
}

Protocol Buffer Ingestion (Production Recommended)

For type-safe, efficient ingestion in production:

import (
    "google.golang.org/protobuf/proto"
    "google.golang.org/protobuf/types/descriptorpb"
)

// 1. Load descriptor from generated files
descriptorBytes, err := os.ReadFile("path/to/schema.descriptor")
if err != nil {
    log.Fatal(err)
}

descriptor := &descriptorpb.DescriptorProto{}
if err := proto.Unmarshal(descriptorBytes, descriptor); err != nil {
    log.Fatal(err)
}

// 2. Create stream for Proto records
options := zerobus.DefaultStreamConfigurationOptions()
options.RecordType = zerobus.RecordTypeProto

stream, err := sdk.CreateStream(
    zerobus.TableProperties{
        TableName:       "catalog.schema.table",
        DescriptorProto: descriptorBytes,
    },
    clientID,
    clientSecret,
    options,
)

// 3. Ingest proto-encoded record (blocks until queued)
ack, err := stream.IngestRecord(protoBytes)

// 4. Await acknowledgment to get offset
offset, err := ack.Await()

Next Steps

Repository Structure

zerobus-go-sdk/
├── sdk/                            # Core SDK library
│   ├── zerobus.go                  # Main SDK and stream implementation
│   ├── ffi.go                      # CGO bindings to Rust FFI
│   ├── errors.go                   # Error types
│   ├── go.mod                      # Go module definition
│   └── zerobus-ffi/                # Rust FFI crate
│       ├── src/lib.rs              # FFI wrapper implementation
│       ├── zerobus.h               # C header for CGO
│       ├── Cargo.toml              # Rust dependencies
│       └── build.rs                # Build script for cbindgen
│
├── examples/                       # Working examples
│   ├── basic_example_json/                       # JSON ingestion example
│   │   ├── basic_json_usage.go     # JSON-based example
│   │   └── go.mod                  # Module file
│   └── basic_example_proto/                      # Protocol Buffer example
│       ├── basic_proto_usage.go    # Proto-based example
│       ├── air_quality.proto       # Example proto schema
│       ├── pb/                     # Generated proto code
│       └── go.mod                  # Module file
│
├── tests/                          # Test suite
│   └── README.md                   # Testing documentation
│
├── Makefile                        # Build automation
├── README.md                       # This file
├── CHANGELOG.md                    # Version history
├── CONTRIBUTING.md                 # Contribution guidelines
├── SECURITY.md                     # Security policy
├── DCO                             # Developer Certificate of Origin
├── NOTICE                          # Third-party attribution
└── LICENSE                         # License file

Key Components

  • sdk/ - The main library containing Go SDK and Rust FFI wrapper
  • examples/ - Complete working examples demonstrating SDK usage
  • tests/ - Integration and unit tests
  • Makefile - Standard make targets for building, testing, and linting

Usage Guide

This section provides detailed step-by-step documentation. For a quick start, see Quick Start. For working examples, see Examples.

1. Initialize the SDK

Create an SDK instance with your Databricks workspace endpoints:

// For AWS
sdk, err := zerobus.NewZerobusSdk(
    "https://your-shard-id.zerobus.region.cloud.databricks.com",
    "https://your-workspace.cloud.databricks.com",
)

// For Azure
sdk, err := zerobus.NewZerobusSdk(
    "https://your-shard-id.zerobus.region.azuredatabricks.net",
    "https://your-workspace.azuredatabricks.net",
)

if err != nil {
    log.Fatal(err)
}
defer sdk.Free()

2. Configure Authentication

The SDK handles authentication automatically. You just need to provide your OAuth credentials:

clientID := os.Getenv("DATABRICKS_CLIENT_ID")
clientSecret := os.Getenv("DATABRICKS_CLIENT_SECRET")

See the examples directory for how to obtain OAuth credentials.

Custom Authentication

For advanced use cases, you can implement the HeadersProvider interface to supply your own authentication headers. This is useful for integrating with a different OAuth provider, using a centralized token caching service, or implementing alternative authentication mechanisms.

Note: The headers you provide must still conform to the authentication protocol expected by the Zerobus service. The default OAuth implementation serves as the reference for the required headers (authorization and x-databricks-zerobus-table-name). This feature provides flexibility in how you source your credentials, not in changing the authentication protocol itself.

Example:

import zerobus "github.com/databricks/zerobus-go-sdk"

// Implement the HeadersProvider interface.
type MyCustomAuthProvider struct {
    tableName string
}

func (p *MyCustomAuthProvider) GetHeaders() (map[string]string, error) {
    // Custom logic to fetch and cache a token would go here.
    return map[string]string{
        "authorization":                    "Bearer <your-token>",
        "x-databricks-zerobus-table-name": p.tableName,
    }, nil
}

func example(sdk *zerobus.ZerobusSdk, tableProps zerobus.TableProperties) error {
    customProvider := &MyCustomAuthProvider{tableName: "catalog.schema.table"}

    stream, err := sdk.CreateStreamWithHeadersProvider(
        tableProps,
        customProvider,
        nil,
    )
    if err != nil {
        return err
    }
    defer stream.Close()

    ack, _ := stream.IngestRecord(`{"data": "value"}`)
    offset, _ := ack.Await()
    return nil
}

Common use cases:

  • Token caching: Implement custom token refresh logic
  • Alternative auth mechanisms: Use different authentication providers
  • Dynamic credentials: Fetch credentials on-demand from secret managers

3. Create a Stream

Configure table properties and stream options:

options := zerobus.DefaultStreamConfigurationOptions()
options.MaxInflightRequests = 10000
options.Recovery = true
options.RecoveryRetries = 4
options.RecordType = zerobus.RecordTypeJson

stream, err := sdk.CreateStream(
    zerobus.TableProperties{
        TableName: "catalog.schema.table",
    },
    clientID,
    clientSecret,
    options,
)
if err != nil {
    log.Fatal(err)
}
defer stream.Close()

4. Ingest Data

Single record:

// JSON (string) - blocks until queued, handles backpressure
ack, err := stream.IngestRecord(`{"id": 1, "value": "hello"}`)
if err != nil {
    log.Fatal(err)
}

// Await acknowledgment
offset, err := ack.Await()
if err != nil {
    log.Fatal(err)
}
log.Printf("Record ingested at offset: %d", offset)

Batch ingestion for high throughput:

// Queue all records
acks := make([]*zerobus.RecordAck, 0, 100000)
for i := 0; i < 100000; i++ {
    jsonData := fmt.Sprintf(`{"id": %d, "timestamp": %d}`, i, time.Now().Unix())
    ack, err := stream.IngestRecord(jsonData)
    if err != nil {
        log.Fatal(err)
    }
    acks = append(acks, ack)
}

// Wait for server acknowledgments
for i, ack := range acks {
    if _, err := ack.Await(); err != nil {
        log.Printf("Record %d failed: %v", i, err)
    }
}

For non-blocking concurrent ingestion, use goroutines:

var wg sync.WaitGroup
errCh := make(chan error, 100)

for i := 0; i < 100; i++ {
    wg.Add(1)
    go func(id int) {
        defer wg.Done()

        data := fmt.Sprintf(`{"id": %d}`, id)
        ack, err := stream.IngestRecord(data)  // Blocks this goroutine
        if err != nil {
            errCh <- err
            return
        }

        offset, err := ack.Await()
        if err != nil {
            errCh <- err
            return
        }
        log.Printf("Record %d acknowledged at offset %d", id, offset)
    }(id)
}

wg.Wait()
close(errCh)

// Check for errors
for err := range errCh {
    log.Printf("Ingestion error: %v", err)
}

Concurrent ingestion with multiple streams:

var wg sync.WaitGroup
for partition := 0; partition < 4; partition++ {
    wg.Add(1)
    go func(p int) {
        defer wg.Done()

        // Each goroutine gets its own stream
        stream, err := sdk.CreateStream(tableProps, clientID, clientSecret, options)
        if err != nil {
            log.Fatal(err)
        }
        defer stream.Close()

        for i := p * 25000; i < (p+1)*25000; i++ {
            data := fmt.Sprintf(`{"id": %d}`, i)
            // Blocks until queued in this goroutine
            if _, err := stream.IngestRecord(data); err != nil {
                log.Fatal(err)
            }
            // Note: stream.Close() will flush and await all pending acks
        }
    }(p)
}
wg.Wait()

5. Handle Acknowledgments

After IngestRecord() returns, the record is queued. Use the returned acknowledgment to wait for server confirmation:

offset, err := stream.IngestRecord(data)
if err != nil {
    // Handle ingestion error
    if zerobusErr, ok := err.(*zerobus.ZerobusError); ok {
        if zerobusErr.Retryable() {
            // Retry logic for transient failures
        }
    }
    log.Fatal(err)
}

// Offset is available immediately
log.Printf("Record committed at offset: %d", offset)

6. Close the Stream

Always close streams to ensure data is flushed:

// Close gracefully (flushes automatically)
if err := stream.Close(); err != nil {
    log.Fatal(err)
}

Configuration Options

StreamConfigurationOptions

Field Type Default Description
MaxInflightRequests uint64 1,000,000 Maximum number of in-flight requests
Recovery bool true Enable automatic stream recovery on failure
RecoveryTimeoutMs uint64 15,000 Timeout for recovery operations (ms)
RecoveryBackoffMs uint64 2,000 Delay between recovery retry attempts (ms)
RecoveryRetries uint32 4 Maximum number of recovery attempts
FlushTimeoutMs uint64 300,000 Timeout for flush operations (ms)
ServerLackOfAckTimeoutMs uint64 60,000 Timeout waiting for server acks (ms)
RecordType int Proto Record type: RecordTypeProto or RecordTypeJson

Example:

options := zerobus.DefaultStreamConfigurationOptions()
options.MaxInflightRequests = 50000
options.RecoveryRetries = 10
options.FlushTimeoutMs = 600000
options.RecordType = zerobus.RecordTypeJson

Error Handling

The SDK categorizes errors as retryable or non-retryable:

Retryable Errors

Auto-recovered if Recovery is enabled:

  • Network failures
  • Connection timeouts
  • Temporary server errors
  • Stream closed by server

Non-Retryable Errors

Require manual intervention:

  • Invalid OAuth credentials
  • Invalid table name
  • Schema mismatch
  • Authentication failure
  • Permission denied

Check if an error is retryable:

offset, err := stream.IngestRecord(data)
if err != nil {
    if zerobusErr, ok := err.(*zerobus.ZerobusError); ok {
        if zerobusErr.Retryable() {
            log.Printf("Retryable error, SDK will auto-recover: %v", err)
            // Optionally implement custom retry logic
        } else {
            log.Fatalf("Fatal error, manual intervention needed: %v", err)
        }
    }
}

Examples

The examples/ directory contains complete, runnable examples:

  • examples/basic_example_json/ - Simple JSON-based ingestion
  • examples/basic_example_proto/ - Type-safe Protocol Buffer ingestion

To run an example:

# Navigate to an example directory
cd examples/basic_example_json

# Set your credentials
export ZEROBUS_SERVER_ENDPOINT="https://your-zerobus-endpoint.databricks.com"
export DATABRICKS_WORKSPACE_URL="https://your-workspace.databricks.com"
export DATABRICKS_CLIENT_ID="your-client-id"
export DATABRICKS_CLIENT_SECRET="your-client-secret"
export ZEROBUS_TABLE_NAME="catalog.schema.table"

# Run the example
go run basic_json_usage.go

Each example includes detailed comments and demonstrates best practices for production use.

Best Practices

  1. Reuse SDK Instances - Create one ZerobusSdk per application and reuse for multiple streams
  2. Always Close Streams - Use defer stream.Close() to ensure all data is flushed
  3. Tune Inflight Limits - Adjust MaxInflightRequests based on memory and throughput needs
  4. Enable Recovery - Always set Recovery: true in production environments
  5. Use Batch Ingestion - For high throughput, ingest many records before calling Flush()
  6. Monitor Errors - Log and alert on non-retryable errors
  7. Use Protocol Buffers for Production - More efficient than JSON for high-volume scenarios
  8. Secure Credentials - Never hardcode secrets; use environment variables or secret managers
  9. Test Recovery - Simulate failures to verify your error handling logic
  10. One Stream Per Goroutine - Don't share streams across goroutines; create separate streams for concurrent ingestion

API Reference

ZerobusSdk

Main entry point for the SDK.

NewZerobusSdk(zerobusEndpoint, unityCatalogURL string) (*ZerobusSdk, error)

Creates a new SDK instance.

  • zerobusEndpoint: Zerobus gRPC service endpoint
  • unityCatalogURL: Unity Catalog URL for OAuth token acquisition

CreateStream(tableProps TableProperties, clientID, clientSecret string, options *StreamConfigurationOptions) (*ZerobusStream, error)

Creates a new ingestion stream with OAuth authentication.

CreateStreamWithHeadersProvider(tableProps TableProperties, headersProvider HeadersProvider, options *StreamConfigurationOptions) (*ZerobusStream, error)

Creates a new ingestion stream with a custom headers provider for advanced authentication. Use this when you need custom authentication logic (e.g., custom token caching, or alternative auth providers).

Example:

provider := &MyCustomAuthProvider{}
stream, err := sdk.CreateStreamWithHeadersProvider(
    tableProps,
    provider,
    options,
)

Free()

Explicitly releases SDK resources. Called automatically by finalizer.

ZerobusStream

Represents an active bidirectional streaming connection.

IngestRecord(payload interface{}) (*RecordAck, error)

Ingests a record into the stream. Blocks until the record is queued (handles backpressure), then returns an acknowledgment handle for awaiting server confirmation.

Accepts either:

  • string for JSON-encoded records
  • []byte for Protocol Buffer-encoded records

Returns a *RecordAck that can be awaited to get the logical offset assigned to the record.

Example:

ack, err := stream.IngestRecord(`{"id": 1}`)
if err != nil {
    // Handle queueing errors
}

// Wait for server acknowledgment
offset, err := ack.Await()
if err != nil {
    // Handle acknowledgment errors
}

For non-blocking ingestion:

go func() {
    ack, _ := stream.IngestRecord(data)  // This goroutine blocks
    offset, _ := ack.Await()
    // Handle offset
}()

Flush() error

Blocks until all pending records are acknowledged by the server.

Close() error

Gracefully closes the stream after flushing pending records.

RecordAck

Represents a pending acknowledgment for an ingested record.

Await() (int64, error)

Blocks until the record is acknowledged by the server and returns the offset. Can only be called once - subsequent calls return the cached result.

Example:

ack, _ := stream.IngestRecord(data)
offset, err := ack.Await()
if err != nil {
    log.Printf("Record failed: %v", err)
}

TryGet() (int64, error, bool)

Non-blocking check for acknowledgment status.

Returns:

  • (offset, nil, true) if acknowledgment is ready
  • (0, nil, false) if still pending
  • (0, error, true) if an error occurred

Example:

ack, _ := stream.IngestRecord(data)
// Do other work...
if offset, err, ready := ack.TryGet(); ready {
    if err != nil {
        log.Printf("Record failed: %v", err)
    } else {
        log.Printf("Record acknowledged at offset %d", offset)
    }
} else {
    log.Println("Still waiting for acknowledgment")
}

HeadersProvider

Interface for providing custom authentication headers.

type HeadersProvider interface {
    // GetHeaders returns authentication headers.
    // Called by the SDK when authentication is needed.
    GetHeaders() (map[string]string, error)
}

Example implementation:

type CustomProvider struct{}

func (p *CustomProvider) GetHeaders() (map[string]string, error) {
    return map[string]string{
        "authorization": "Bearer token",
        "x-databricks-zerobus-table-name": "catalog.schema.table",
    }, nil
}

TableProperties

type TableProperties struct {
    TableName       string
    DescriptorProto []byte
}

StreamConfigurationOptions

See Configuration Options for details.

ZerobusError

type ZerobusError struct {
    Message     string
    IsRetryable bool
}

Error() string

Returns the error message.

Retryable() bool

Returns true if the error can be automatically recovered by the SDK.

Building from Source

This section is for contributors and those who need to build the SDK from source. If you just want to use the SDK, see Getting Started instead.

Basic Build

git clone https://github.com/databricks/zerobus-go-sdk.git
cd zerobus-go-sdk
make build

Build Specific Components

# Build only Rust FFI
make build-rust

# Build only Go SDK
make build-go

# Build examples
make examples

# Run tests
make test

# Format code
make fmt

# Run linters
make lint

Platform-Specific Build Notes

Windows

The build system automatically compiles Rust with the GNU toolchain (x86_64-pc-windows-gnu) for compatibility with Go's MinGW-based CGO. This ensures the MSVC and GNU ABIs don't conflict.

Linux

Standard GCC toolchain is used. Install build tools with:

sudo apt-get install build-essential  # Ubuntu/Debian
sudo dnf install gcc                   # Fedora/RHEL

macOS

Uses the system Clang toolchain. Install with:

xcode-select --install

Troubleshooting

Installing Rust

If you need to install Rust, use the official installer:

curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh

After installation, restart your terminal or run:

source $HOME/.cargo/env

Verify installation:

rustc --version
cargo --version

Common Build Issues

"cargo not found"

  • Ensure Rust is installed and ~/.cargo/bin is in your PATH
  • Restart your terminal after installing Rust

"linker 'cc' not found" (Linux)

  • Install build tools: sudo apt-get install build-essential

"xcrun: error: unable to find utility 'cc'" (macOS)

  • Install Xcode tools: xcode-select --install

"undefined reference to __chkstk" (Windows)

  • Ensure you're using MinGW-w64, not just MinGW
  • The SDK automatically builds for the GNU target

Community and Contributing

This is an open source project. We welcome contributions, feedback, and bug reports.

License

This SDK is licensed under the Databricks License. See the LICENSE file for the full license text. The license is also available online at https://www.databricks.com/legal/db-license.


For issues, questions, or contributions, please visit the GitHub repository.