A high-performance Python client for streaming data ingestion into Databricks Delta tables using the Zerobus service.
- Overview
- Features
- Installation
- Quick Start
- Configuration
- Error Handling
- Handling Stream Failures
- Performance Tips
- API Reference
- Debugging
- Building from Source
- Community and Contributing
- License
The Zerobus Python SDK is a thin wrapper around the Zerobus Rust SDK, built using PyO3 bindings. It delivers native performance with a Python-friendly API supporting both synchronous and asynchronous usage.
What is Zerobus? See the project overview for details on the Zerobus service.
Prerequisites (workspace setup, table creation, service principal): See the top-level README.
┌─────────────────────────────────────────┐
│ Python Application Code │
└─────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────┐
│ Python SDK (Thin Wrapper) │
│ • Sync and async APIs │
│ • Python types & error handling │
└─────────────────────────────────────────┘
│
▼ (PyO3 bindings)
┌─────────────────────────────────────────┐
│ Rust Core Implementation │
│ • gRPC communication │
│ • OAuth 2.0 authentication │
│ • Stream management & recovery │
└─────────────────────────────────────────┘
- Rust-backed performance - Native Rust implementation via PyO3 bindings for maximum throughput
- Sync and Async support - Both synchronous and asynchronous Python APIs
- Automatic recovery - Built-in retry and reconnection for transient failures
- Multiple serialization formats - JSON (simple) and Protocol Buffers (type-safe)
- OAuth 2.0 authentication - Secure authentication with client credentials, automatically refreshed
- Acknowledgment callbacks - Receive notifications when records are acknowledged or encounter errors
- Flexible configuration - Fine-tune timeouts, retries, and recovery behavior
pip install databricks-zerobus-ingest-sdkPre-built wheels are available for:
- Linux: x86_64, aarch64 (manylinux)
- macOS: x86_64, arm64
- Windows: x86_64
Requires Python 3.9 or higher.
protobuf>= 4.25.0, < 7.0 (for Protocol Buffer schema handling)requests>= 2.28.1, < 3 (only for thegenerate_protoutility tool)
All core ingestion functionality (gRPC, OAuth, stream management) is handled by the native Rust implementation.
- JSON - Simple, no schema compilation needed. Good for getting started.
- Protocol Buffers - Strongly-typed schemas, more efficient over the wire.
Synchronous:
from zerobus.sdk.sync import ZerobusSdk
from zerobus.sdk.shared import RecordType, StreamConfigurationOptions, TableProperties
server_endpoint = "https://1234567890123456.zerobus.us-west-2.cloud.databricks.com"
workspace_url = "https://dbc-a1b2c3d4-e5f6.cloud.databricks.com"
sdk = ZerobusSdk(server_endpoint, workspace_url)
table_properties = TableProperties("main.default.air_quality")
options = StreamConfigurationOptions(record_type=RecordType.JSON)
stream = sdk.create_stream(client_id, client_secret, table_properties, options)
try:
for i in range(100):
offset = stream.ingest_record_offset({
"device_name": f"sensor-{i % 10}",
"temp": 20 + (i % 15),
"humidity": 50 + (i % 40)
})
stream.flush()
finally:
stream.close()Asynchronous:
import asyncio
from zerobus.sdk.aio import ZerobusSdk
from zerobus.sdk.shared import RecordType, StreamConfigurationOptions, TableProperties
async def main():
server_endpoint = "https://1234567890123456.zerobus.us-west-2.cloud.databricks.com"
workspace_url = "https://dbc-a1b2c3d4-e5f6.cloud.databricks.com"
sdk = ZerobusSdk(server_endpoint, workspace_url)
table_properties = TableProperties("main.default.air_quality")
options = StreamConfigurationOptions(record_type=RecordType.JSON)
stream = await sdk.create_stream(client_id, client_secret, table_properties, options)
try:
for i in range(100):
offset = await stream.ingest_record_offset({
"device_name": f"sensor-{i % 10}",
"temp": 20 + (i % 15),
"humidity": 50 + (i % 40)
})
await stream.flush()
finally:
await stream.close()
asyncio.run(main())First, define a protobuf schema. Use proto2 syntax with optional fields to match Delta table columns:
// record.proto
syntax = "proto2";
message AirQuality {
optional string device_name = 1;
optional int32 temp = 2;
optional int64 humidity = 3;
}See the Delta → Protobuf type mappings in the top-level README.
Compile the schema to generate a Python module:
pip install "grpcio-tools>=1.60.0,<2.0"
python -m grpc_tools.protoc --python_out=. --proto_path=. record.proto
# Generates record_pb2.pyLoad the descriptor from the generated module and pass it to TableProperties:
import record_pb2
# The DESCRIPTOR is the compiled schema — pass it so the SDK can validate records
table_properties = TableProperties("main.default.air_quality", record_pb2.AirQuality.DESCRIPTOR)Alternatively, generate the schema automatically from an existing Unity Catalog table:
python -m zerobus.tools.generate_proto \
--uc-endpoint "https://dbc-a1b2c3d4-e5f6.cloud.databricks.com" \
--client-id "your-client-id" \
--client-secret "your-client-secret" \
--table "main.default.air_quality" \
--output "record.proto" \
--proto-msg "AirQuality"
# Then compile the generated file the same way:
python -m grpc_tools.protoc --python_out=. --proto_path=. record.protoSynchronous:
from zerobus.sdk.sync import ZerobusSdk
from zerobus.sdk.shared import TableProperties
import record_pb2
sdk = ZerobusSdk(server_endpoint, workspace_url)
table_properties = TableProperties("main.default.air_quality", record_pb2.AirQuality.DESCRIPTOR)
stream = sdk.create_stream(client_id, client_secret, table_properties)
try:
for i in range(100):
record = record_pb2.AirQuality(
device_name=f"sensor-{i % 10}",
temp=20 + (i % 15),
humidity=50 + (i % 40)
)
stream.ingest_record_nowait(record)
stream.flush()
finally:
stream.close()Asynchronous:
import asyncio
from zerobus.sdk.aio import ZerobusSdk
from zerobus.sdk.shared import TableProperties
import record_pb2
async def main():
sdk = ZerobusSdk(server_endpoint, workspace_url)
table_properties = TableProperties("main.default.air_quality", record_pb2.AirQuality.DESCRIPTOR)
stream = await sdk.create_stream(client_id, client_secret, table_properties)
try:
for i in range(100):
record = record_pb2.AirQuality(
device_name=f"sensor-{i % 10}",
temp=20 + (i % 15),
humidity=50 + (i % 40)
)
stream.ingest_record_nowait(record)
await stream.flush()
finally:
await stream.close()
asyncio.run(main())See the examples/ directory for complete runnable examples.
Configure stream behavior by passing a StreamConfigurationOptions object to create_stream():
from zerobus.sdk.shared import StreamConfigurationOptions, RecordType, AckCallback
class MyCallback(AckCallback):
def on_ack(self, offset: int):
print(f"Acknowledged offset: {offset}")
def on_error(self, offset: int, error_message: str):
print(f"Error at offset {offset}: {error_message}")
options = StreamConfigurationOptions(
record_type=RecordType.JSON,
max_inflight_records=10000,
recovery=True,
ack_callback=MyCallback()
)
stream = sdk.create_stream(client_id, client_secret, table_properties, options)| Option | Type | Default | Description |
|---|---|---|---|
record_type |
RecordType |
RecordType.PROTO |
Serialization format: PROTO or JSON |
max_inflight_records |
int |
50000 |
Maximum number of unacknowledged records |
recovery |
bool |
True |
Enable automatic stream recovery |
recovery_timeout_ms |
int |
15000 |
Timeout for recovery operations (ms) |
recovery_backoff_ms |
int |
2000 |
Delay between recovery attempts (ms) |
recovery_retries |
int |
3 |
Maximum number of recovery attempts |
flush_timeout_ms |
int |
300000 |
Timeout for flush operations (ms) |
server_lack_of_ack_timeout_ms |
int |
60000 |
Server acknowledgment timeout (ms) |
stream_paused_max_wait_time_ms |
Optional[int] |
None |
Max wait during graceful stream close. None = full server duration, 0 = immediate, x = min(x, server_duration) |
callback_max_wait_time_ms |
Optional[int] |
5000 |
Max wait for callbacks after close(). None = wait forever |
ack_callback |
AckCallback |
None |
Callback invoked on record acknowledgment or error |
The SDK raises two types of exceptions:
ZerobusException- Retriable errors (network issues, temporary server errors)NonRetriableException- Non-retriable errors (invalid credentials, missing table)
from zerobus.sdk.shared import ZerobusException, NonRetriableException
try:
stream.ingest_record_offset(record)
except NonRetriableException as e:
print(f"Fatal error: {e}")
raise
except ZerobusException as e:
print(f"Retriable error: {e}")The SDK automatically handles retries for transient errors. Use get_unacked_records() only when a stream has permanently failed (non-retriable error or max retries exceeded):
from zerobus.sdk.shared import NonRetriableException
try:
for i in range(10000):
stream.ingest_record_offset(record)
stream.flush()
except NonRetriableException as e:
unacked = stream.get_unacked_records() # Returns List[bytes]
print(f"Stream failed: {e}. {len(unacked)} records unacknowledged.")
# Retry with a new stream
new_stream = sdk.create_stream(client_id, client_secret, table_properties, options)
for record_bytes in unacked:
new_stream.ingest_record_offset(record_bytes) # Pass bytes directly
new_stream.flush()
new_stream.close()Use get_unacked_batches() for batch-level retry:
unacked_batches = stream.get_unacked_batches() # Returns List[List[bytes]]
for batch in unacked_batches:
new_stream.ingest_records_offset(batch)Decoding unacked records:
- JSON mode:
json.loads(record_bytes.decode('utf-8')) - Protobuf mode:
YourMessage.FromString(record_bytes)
| Method | Throughput | Use Case |
|---|---|---|
ingest_record_nowait() |
Highest | Maximum throughput, fire-and-forget |
ingest_record_offset() |
Medium | Need offset tracking |
ingest_record() |
Low | Deprecated - avoid |
Benchmarked with 100k records on a local connection:
| Record Size | ingest_record (sequential) |
ingest_record_nowait |
|---|---|---|
| 20 bytes | 0.35 MB/s | 7.55 MB/s (20x faster) |
| 220 bytes | 2.00 MB/s | 77 MB/s (38x faster) |
| 750 bytes | 16 MB/s | 257 MB/s (16x faster) |
| 10 KB | 188 MB/s | 382 MB/s (2x faster) |
Main entry point. Sync: from zerobus.sdk.sync import ZerobusSdk / Async: from zerobus.sdk.aio import ZerobusSdk
sdk = ZerobusSdk(server_endpoint: str, unity_catalog_endpoint: str)# Sync
stream = sdk.create_stream(client_id, client_secret, table_properties, options=None, headers_provider=None)
# Async
stream = await sdk.create_stream(client_id, client_secret, table_properties, options=None, headers_provider=None)Single record ingestion:
| Method | Sync | Async | Notes |
|---|---|---|---|
ingest_record_nowait(record) |
→ None |
→ None (not async) |
Fire-and-forget, highest throughput |
ingest_record_offset(record) |
→ int |
await → int |
Returns offset after queueing |
ingest_record(record) |
→ RecordAcknowledgment |
await → Awaitable |
Deprecated since v0.3.0 |
Batch ingestion:
| Method | Sync | Async | Notes |
|---|---|---|---|
ingest_records_nowait(records) |
→ None |
→ None (not async) |
Fire-and-forget |
ingest_records_offset(records) |
→ int |
await → int |
Returns final offset |
Accepted record types:
- JSON mode:
dict(SDK serializes) orstr(pre-serialized JSON) - Protobuf mode:
Messageobject (SDK serializes) orbytes(pre-serialized)
Offset tracking:
# Sync
offset = stream.ingest_record_offset(record)
# ... do other work ...
stream.wait_for_offset(offset) # Block until durably written
# Async
offset = await stream.ingest_record_offset(record)
# ... do other work ...
await stream.wait_for_offset(offset) # Block until durably writtenStream management:
# Sync
stream.flush() # Wait for all pending records to be acknowledged
stream.close() # Flush and close gracefully (always call in finally)
# Async
await stream.flush()
await stream.close()Unacknowledged records:
# Sync
records = stream.get_unacked_records() # List[bytes]
batches = stream.get_unacked_batches() # List[List[bytes]]
# Async
records = await stream.get_unacked_records()
batches = await stream.get_unacked_batches()TableProperties(table_name: str, descriptor: Descriptor = None)
# JSON mode
TableProperties("catalog.schema.table")
# Protobuf mode
TableProperties("catalog.schema.table", MyMessage.DESCRIPTOR)See Configuration for full parameter list.
from zerobus.sdk.shared import AckCallback
class MyCallback(AckCallback):
def on_ack(self, offset: int) -> None:
# Called when a record is acknowledged by the server
pass
def on_error(self, offset: int, error_message: str) -> None:
# Called when a record encounters an error
passFor custom authentication (e.g. custom token providers), implement HeadersProvider and pass it to create_stream(). Must include both authorization and x-databricks-zerobus-table-name headers. See examples/ for implementation details.
ack.wait_for_ack(timeout_sec=None) # Block until acknowledged
ack.is_done() -> bool
ack.add_done_callback(callback)ZerobusException(message, cause=None)- Retriable errorsNonRetriableException(message, cause=None)- Non-retriable errors (extendsZerobusException)
The SDK uses Rust's tracing framework. Control log levels via RUST_LOG:
export RUST_LOG=info # Default
export RUST_LOG=debug # Detailed debugging
export RUST_LOG=trace # Very verbose
export RUST_LOG=zerobus_sdk=debug # Only SDK componentsBuilding from source requires the Rust toolchain (install from rustup.rs).
git clone https://github.com/databricks/zerobus-sdk.git
cd zerobus-sdk/python
make dev # Set up venv and install in editable mode
make test # Run tests
make build # Build release wheelFor development workflows and detailed instructions, see CONTRIBUTING.md.
We are keen to hear feedback. Please file issues.
See CONTRIBUTING.md for development setup and contribution guidelines.
This project is licensed under the Databricks License. See LICENSE for the full text.