Skip to content

Latest commit

 

History

History

README.md

Zerobus Python SDK

PyPI - Downloads PyPI - License PyPI

A high-performance Python client for streaming data ingestion into Databricks Delta tables using the Zerobus service.

Table of Contents

Overview

The Zerobus Python SDK is a thin wrapper around the Zerobus Rust SDK, built using PyO3 bindings. It delivers native performance with a Python-friendly API supporting both synchronous and asynchronous usage.

What is Zerobus? See the project overview for details on the Zerobus service.

Prerequisites (workspace setup, table creation, service principal): See the top-level README.

Architecture

┌─────────────────────────────────────────┐
│         Python Application Code         │
└─────────────────────────────────────────┘
                    │
                    ▼
┌─────────────────────────────────────────┐
│       Python SDK (Thin Wrapper)         │
│    • Sync and async APIs                │
│    • Python types & error handling      │
└─────────────────────────────────────────┘
                    │
                    ▼ (PyO3 bindings)
┌─────────────────────────────────────────┐
│         Rust Core Implementation        │
│    • gRPC communication                 │
│    • OAuth 2.0 authentication           │
│    • Stream management & recovery       │
└─────────────────────────────────────────┘

Features

  • Rust-backed performance - Native Rust implementation via PyO3 bindings for maximum throughput
  • Sync and Async support - Both synchronous and asynchronous Python APIs
  • Automatic recovery - Built-in retry and reconnection for transient failures
  • Multiple serialization formats - JSON (simple) and Protocol Buffers (type-safe)
  • OAuth 2.0 authentication - Secure authentication with client credentials, automatically refreshed
  • Acknowledgment callbacks - Receive notifications when records are acknowledged or encounter errors
  • Flexible configuration - Fine-tune timeouts, retries, and recovery behavior

Installation

From PyPI (Recommended)

pip install databricks-zerobus-ingest-sdk

Pre-built wheels are available for:

  • Linux: x86_64, aarch64 (manylinux)
  • macOS: x86_64, arm64
  • Windows: x86_64

Python Version

Requires Python 3.9 or higher.

Dependencies

  • protobuf >= 4.25.0, < 7.0 (for Protocol Buffer schema handling)
  • requests >= 2.28.1, < 3 (only for the generate_proto utility tool)

All core ingestion functionality (gRPC, OAuth, stream management) is handled by the native Rust implementation.

Quick Start

Choose Your Serialization Format

  1. JSON - Simple, no schema compilation needed. Good for getting started.
  2. Protocol Buffers - Strongly-typed schemas, more efficient over the wire.

Option 1: JSON (Simplest)

Synchronous:

from zerobus.sdk.sync import ZerobusSdk
from zerobus.sdk.shared import RecordType, StreamConfigurationOptions, TableProperties

server_endpoint = "https://1234567890123456.zerobus.us-west-2.cloud.databricks.com"
workspace_url = "https://dbc-a1b2c3d4-e5f6.cloud.databricks.com"

sdk = ZerobusSdk(server_endpoint, workspace_url)
table_properties = TableProperties("main.default.air_quality")
options = StreamConfigurationOptions(record_type=RecordType.JSON)
stream = sdk.create_stream(client_id, client_secret, table_properties, options)

try:
    for i in range(100):
        offset = stream.ingest_record_offset({
            "device_name": f"sensor-{i % 10}",
            "temp": 20 + (i % 15),
            "humidity": 50 + (i % 40)
        })
    stream.flush()
finally:
    stream.close()

Asynchronous:

import asyncio
from zerobus.sdk.aio import ZerobusSdk
from zerobus.sdk.shared import RecordType, StreamConfigurationOptions, TableProperties

async def main():
    server_endpoint = "https://1234567890123456.zerobus.us-west-2.cloud.databricks.com"
    workspace_url = "https://dbc-a1b2c3d4-e5f6.cloud.databricks.com"

    sdk = ZerobusSdk(server_endpoint, workspace_url)
    table_properties = TableProperties("main.default.air_quality")
    options = StreamConfigurationOptions(record_type=RecordType.JSON)
    stream = await sdk.create_stream(client_id, client_secret, table_properties, options)

    try:
        for i in range(100):
            offset = await stream.ingest_record_offset({
                "device_name": f"sensor-{i % 10}",
                "temp": 20 + (i % 15),
                "humidity": 50 + (i % 40)
            })
        await stream.flush()
    finally:
        await stream.close()

asyncio.run(main())

Option 2: Protocol Buffers

First, define a protobuf schema. Use proto2 syntax with optional fields to match Delta table columns:

// record.proto
syntax = "proto2";
message AirQuality {
    optional string device_name = 1;
    optional int32 temp = 2;
    optional int64 humidity = 3;
}

See the Delta → Protobuf type mappings in the top-level README.

Compile the schema to generate a Python module:

pip install "grpcio-tools>=1.60.0,<2.0"
python -m grpc_tools.protoc --python_out=. --proto_path=. record.proto
# Generates record_pb2.py

Load the descriptor from the generated module and pass it to TableProperties:

import record_pb2

# The DESCRIPTOR is the compiled schema — pass it so the SDK can validate records
table_properties = TableProperties("main.default.air_quality", record_pb2.AirQuality.DESCRIPTOR)

Alternatively, generate the schema automatically from an existing Unity Catalog table:

python -m zerobus.tools.generate_proto \
    --uc-endpoint "https://dbc-a1b2c3d4-e5f6.cloud.databricks.com" \
    --client-id "your-client-id" \
    --client-secret "your-client-secret" \
    --table "main.default.air_quality" \
    --output "record.proto" \
    --proto-msg "AirQuality"

# Then compile the generated file the same way:
python -m grpc_tools.protoc --python_out=. --proto_path=. record.proto

Synchronous:

from zerobus.sdk.sync import ZerobusSdk
from zerobus.sdk.shared import TableProperties
import record_pb2

sdk = ZerobusSdk(server_endpoint, workspace_url)
table_properties = TableProperties("main.default.air_quality", record_pb2.AirQuality.DESCRIPTOR)
stream = sdk.create_stream(client_id, client_secret, table_properties)

try:
    for i in range(100):
        record = record_pb2.AirQuality(
            device_name=f"sensor-{i % 10}",
            temp=20 + (i % 15),
            humidity=50 + (i % 40)
        )
        stream.ingest_record_nowait(record)
    stream.flush()
finally:
    stream.close()

Asynchronous:

import asyncio
from zerobus.sdk.aio import ZerobusSdk
from zerobus.sdk.shared import TableProperties
import record_pb2

async def main():
    sdk = ZerobusSdk(server_endpoint, workspace_url)
    table_properties = TableProperties("main.default.air_quality", record_pb2.AirQuality.DESCRIPTOR)
    stream = await sdk.create_stream(client_id, client_secret, table_properties)

    try:
        for i in range(100):
            record = record_pb2.AirQuality(
                device_name=f"sensor-{i % 10}",
                temp=20 + (i % 15),
                humidity=50 + (i % 40)
            )
            stream.ingest_record_nowait(record)
        await stream.flush()
    finally:
        await stream.close()

asyncio.run(main())

See the examples/ directory for complete runnable examples.

Configuration

Configure stream behavior by passing a StreamConfigurationOptions object to create_stream():

from zerobus.sdk.shared import StreamConfigurationOptions, RecordType, AckCallback

class MyCallback(AckCallback):
    def on_ack(self, offset: int):
        print(f"Acknowledged offset: {offset}")

    def on_error(self, offset: int, error_message: str):
        print(f"Error at offset {offset}: {error_message}")

options = StreamConfigurationOptions(
    record_type=RecordType.JSON,
    max_inflight_records=10000,
    recovery=True,
    ack_callback=MyCallback()
)

stream = sdk.create_stream(client_id, client_secret, table_properties, options)

Available Options

Option Type Default Description
record_type RecordType RecordType.PROTO Serialization format: PROTO or JSON
max_inflight_records int 50000 Maximum number of unacknowledged records
recovery bool True Enable automatic stream recovery
recovery_timeout_ms int 15000 Timeout for recovery operations (ms)
recovery_backoff_ms int 2000 Delay between recovery attempts (ms)
recovery_retries int 3 Maximum number of recovery attempts
flush_timeout_ms int 300000 Timeout for flush operations (ms)
server_lack_of_ack_timeout_ms int 60000 Server acknowledgment timeout (ms)
stream_paused_max_wait_time_ms Optional[int] None Max wait during graceful stream close. None = full server duration, 0 = immediate, x = min(x, server_duration)
callback_max_wait_time_ms Optional[int] 5000 Max wait for callbacks after close(). None = wait forever
ack_callback AckCallback None Callback invoked on record acknowledgment or error

Error Handling

The SDK raises two types of exceptions:

  • ZerobusException - Retriable errors (network issues, temporary server errors)
  • NonRetriableException - Non-retriable errors (invalid credentials, missing table)
from zerobus.sdk.shared import ZerobusException, NonRetriableException

try:
    stream.ingest_record_offset(record)
except NonRetriableException as e:
    print(f"Fatal error: {e}")
    raise
except ZerobusException as e:
    print(f"Retriable error: {e}")

Handling Stream Failures

The SDK automatically handles retries for transient errors. Use get_unacked_records() only when a stream has permanently failed (non-retriable error or max retries exceeded):

from zerobus.sdk.shared import NonRetriableException

try:
    for i in range(10000):
        stream.ingest_record_offset(record)
    stream.flush()
except NonRetriableException as e:
    unacked = stream.get_unacked_records()  # Returns List[bytes]
    print(f"Stream failed: {e}. {len(unacked)} records unacknowledged.")

    # Retry with a new stream
    new_stream = sdk.create_stream(client_id, client_secret, table_properties, options)
    for record_bytes in unacked:
        new_stream.ingest_record_offset(record_bytes)  # Pass bytes directly
    new_stream.flush()
    new_stream.close()

Use get_unacked_batches() for batch-level retry:

unacked_batches = stream.get_unacked_batches()  # Returns List[List[bytes]]
for batch in unacked_batches:
    new_stream.ingest_records_offset(batch)

Decoding unacked records:

  • JSON mode: json.loads(record_bytes.decode('utf-8'))
  • Protobuf mode: YourMessage.FromString(record_bytes)

Performance Tips

Method Throughput Use Case
ingest_record_nowait() Highest Maximum throughput, fire-and-forget
ingest_record_offset() Medium Need offset tracking
ingest_record() Low Deprecated - avoid

Benchmarked with 100k records on a local connection:

Record Size ingest_record (sequential) ingest_record_nowait
20 bytes 0.35 MB/s 7.55 MB/s (20x faster)
220 bytes 2.00 MB/s 77 MB/s (38x faster)
750 bytes 16 MB/s 257 MB/s (16x faster)
10 KB 188 MB/s 382 MB/s (2x faster)

API Reference

ZerobusSdk

Main entry point. Sync: from zerobus.sdk.sync import ZerobusSdk / Async: from zerobus.sdk.aio import ZerobusSdk

sdk = ZerobusSdk(server_endpoint: str, unity_catalog_endpoint: str)
# Sync
stream = sdk.create_stream(client_id, client_secret, table_properties, options=None, headers_provider=None)
# Async
stream = await sdk.create_stream(client_id, client_secret, table_properties, options=None, headers_provider=None)

ZerobusStream

Single record ingestion:

Method Sync Async Notes
ingest_record_nowait(record) → None → None (not async) Fire-and-forget, highest throughput
ingest_record_offset(record) → int await → int Returns offset after queueing
ingest_record(record) → RecordAcknowledgment await → Awaitable Deprecated since v0.3.0

Batch ingestion:

Method Sync Async Notes
ingest_records_nowait(records) → None → None (not async) Fire-and-forget
ingest_records_offset(records) → int await → int Returns final offset

Accepted record types:

  • JSON mode: dict (SDK serializes) or str (pre-serialized JSON)
  • Protobuf mode: Message object (SDK serializes) or bytes (pre-serialized)

Offset tracking:

# Sync
offset = stream.ingest_record_offset(record)
# ... do other work ...
stream.wait_for_offset(offset)  # Block until durably written

# Async
offset = await stream.ingest_record_offset(record)
# ... do other work ...
await stream.wait_for_offset(offset)  # Block until durably written

Stream management:

# Sync
stream.flush()   # Wait for all pending records to be acknowledged
stream.close()   # Flush and close gracefully (always call in finally)

# Async
await stream.flush()
await stream.close()

Unacknowledged records:

# Sync
records = stream.get_unacked_records()   # List[bytes]
batches = stream.get_unacked_batches()  # List[List[bytes]]

# Async
records = await stream.get_unacked_records()
batches = await stream.get_unacked_batches()

TableProperties

TableProperties(table_name: str, descriptor: Descriptor = None)

# JSON mode
TableProperties("catalog.schema.table")

# Protobuf mode
TableProperties("catalog.schema.table", MyMessage.DESCRIPTOR)

StreamConfigurationOptions

See Configuration for full parameter list.

AckCallback

from zerobus.sdk.shared import AckCallback

class MyCallback(AckCallback):
    def on_ack(self, offset: int) -> None:
        # Called when a record is acknowledged by the server
        pass

    def on_error(self, offset: int, error_message: str) -> None:
        # Called when a record encounters an error
        pass

HeadersProvider

For custom authentication (e.g. custom token providers), implement HeadersProvider and pass it to create_stream(). Must include both authorization and x-databricks-zerobus-table-name headers. See examples/ for implementation details.

RecordAcknowledgment (Sync only, deprecated)

ack.wait_for_ack(timeout_sec=None)  # Block until acknowledged
ack.is_done() -> bool
ack.add_done_callback(callback)

Exceptions

  • ZerobusException(message, cause=None) - Retriable errors
  • NonRetriableException(message, cause=None) - Non-retriable errors (extends ZerobusException)

Debugging

The SDK uses Rust's tracing framework. Control log levels via RUST_LOG:

export RUST_LOG=info           # Default
export RUST_LOG=debug          # Detailed debugging
export RUST_LOG=trace          # Very verbose
export RUST_LOG=zerobus_sdk=debug  # Only SDK components

Building from Source

Building from source requires the Rust toolchain (install from rustup.rs).

git clone https://github.com/databricks/zerobus-sdk.git
cd zerobus-sdk/python
make dev    # Set up venv and install in editable mode
make test   # Run tests
make build  # Build release wheel

For development workflows and detailed instructions, see CONTRIBUTING.md.

Community and Contributing

We are keen to hear feedback. Please file issues.

See CONTRIBUTING.md for development setup and contribution guidelines.

License

This project is licensed under the Databricks License. See LICENSE for the full text.