This directory contains runnable example applications demonstrating both synchronous and asynchronous usage of the Zerobus Ingest SDK for Python, with examples for both both record type modes: protobuf and JSON.
For complete SDK documentation including installation, API reference, and configuration details, see the main README.
git clone https://github.com/databricks/zerobus-sdk.git
cd zerobus-sdk/pythonpip install -e .The examples use a pre-generated protobuf file (record_pb2.py) based on the included record.proto schema.
Set the following environment variables:
export DATABRICKS_CLIENT_ID="your-service-principal-application-id"
export DATABRICKS_CLIENT_SECRET="your-service-principal-secret"
# For AWS:
export ZEROBUS_SERVER_ENDPOINT="https://workspace-id.zerobus.region.cloud.databricks.com"
export DATABRICKS_WORKSPACE_URL="https://your-workspace.cloud.databricks.com"
# For Azure:
# export ZEROBUS_SERVER_ENDPOINT="https://workspace-id.zerobus.region.azuredatabricks.net"
# export DATABRICKS_WORKSPACE_URL="https://your-workspace.azuredatabricks.net"
export ZEROBUS_TABLE_NAME="catalog.schema.table"# Synchronous examples (blocking I/O)
python examples/sync_example_proto.py # Protobuf
python examples/sync_example_json.py # JSON
# Asynchronous examples (non-blocking I/O)
python examples/async_example_proto.py # Protobuf
python examples/async_example_json.py # JSONAll examples demonstrate multiple ingestion methods:
ingest_record_offset()- Single record with offset trackingingest_records_offset()- Batch ingestion with offset trackingingest_record_nowait()- Fire-and-forget single recordingest_records_nowait()- Fire-and-forget batch (highest throughput)
Each example includes detailed comments explaining when to use each method and their performance characteristics.
The SDK supports two serialization formats:
Files: sync_example_proto.py, async_example_proto.py
More efficient over the wire. You can pass either:
- Message object (SDK serializes to bytes)
- Pre-serialized bytes (client controls serialization)
# Create protobuf record
record = record_pb2.AirQuality(device_name="sensor-1", temp=25, humidity=60)
table_properties = TableProperties(TABLE_NAME, record_pb2.AirQuality.DESCRIPTOR)
options = StreamConfigurationOptions(record_type=RecordType.PROTO)
# Recommended: Use ingest_record_offset() for better performance
offset = stream.ingest_record_offset(record)
# Or fire-and-forget for maximum throughput
stream.ingest_record_nowait(record)
# Option 2: Pass pre-serialized bytes (client controls serialization)
# offset = stream.ingest_record_offset(record.SerializeToString())Files: sync_example_json.py, async_example_json.py
Good for getting started. No protobuf schema required. You can pass either:
- dict (SDK serializes to JSON)
- Pre-serialized JSON string (client controls serialization)
# Create JSON record
record_dict = {"device_name": "sensor-1", "temp": 25, "humidity": 60}
table_properties = TableProperties(TABLE_NAME)
options = StreamConfigurationOptions(record_type=RecordType.JSON)
# Recommended: Use ingest_record_offset() for better performance
offset = stream.ingest_record_offset(record_dict)
# Or fire-and-forget for maximum throughput
stream.ingest_record_nowait(record_dict)
# Option 2: Pass pre-serialized JSON string (client controls serialization)
# offset = stream.ingest_record_offset(json.dumps(record_dict))All record type modes are available in both synchronous and asynchronous variants:
Suitable for:
- Simple scripts and applications
- Code that doesn't use asyncio
- Straightforward blocking I/O patterns
Key characteristics:
- Uses standard Python synchronous functions
- Blocking API calls
- Works in any Python environment
Suitable for:
- Applications already using asyncio
- Async web frameworks (FastAPI, aiohttp, etc.)
- Event-driven architectures
- Integration with other async operations
Key characteristics:
- Uses Python's
async/awaitsyntax - Non-blocking API calls
- Requires an asyncio event loop
Both APIs provide the same functionality and performance. The key differences are:
| Aspect | Synchronous (sync) |
Asynchronous (aio) |
|---|---|---|
| Import | from zerobus.sdk.sync import ZerobusSdk |
from zerobus.sdk.aio import ZerobusSdk |
| Stream creation | stream = sdk.create_stream(...) |
stream = await sdk.create_stream(...) |
| Record ingestion (with offset) | offset = stream.ingest_record_offset(record) |
offset = await stream.ingest_record_offset(record) |
| Record ingestion (fire-and-forget) | stream.ingest_record_nowait(record) |
stream.ingest_record_nowait(record) |
| Flush | stream.flush() |
await stream.flush() |
| Close | stream.close() |
await stream.close() |
| Execution context | Standard Python | Requires asyncio event loop |
| Use case | General Python applications | Asyncio-based applications |
Performance: Both APIs offer equivalent throughput and durability. Choose based on your application's architecture, not performance needs.
Recommended Methods:
Single Record Ingestion:
ingest_record_offset()- Returns offset immediately, use when you need to track offsetsingest_record_nowait()- Fire-and-forget, best for maximum throughput
Batch Ingestion:
ingest_records_offset()- Batch multiple records, returns final offsetingest_records_nowait()- Fire-and-forget batch ingestion, most efficient for bulk data
Deprecated:
ingest_record()- Useingest_record_offset()instead (2-40x slower)
| Format | Record Input | Configuration |
|---|---|---|
| Protobuf (Default) | Message object or bytes |
TableProperties(table_name, descriptor) |
| JSON | dict or str (JSON string) |
TableProperties(table_name) + StreamConfigurationOptions(record_type=RecordType.JSON) |
All examples use OAuth 2.0 authentication with create_stream(). The SDK automatically handles secure TLS connections.
For advanced configurations with custom headers, see the commented examples of CustomHeadersProvider in each example file.
To use your own protobuf schema:
- Modify
record.protoor create a new proto file - Generate Python code:
python -m grpc_tools.protoc --python_out=. --proto_path=. your_schema.proto
- Update the example code to import and use your generated protobuf classes
To use your own JSON structure:
- Define your JSON structure in code:
json_record = json.dumps({"field1": "value1", "field2": 123})
- Configure
StreamConfigurationOptionswithrecord_type=RecordType.JSON - Ensure your JSON structure matches the schema of your Databricks table
Note: The SDK sends JSON strings directly without client-side schema validation.
- Main README - Complete SDK documentation
- API Reference - Detailed API documentation