-
Notifications
You must be signed in to change notification settings - Fork 8
Expand file tree
/
Copy pathsync_example_json.py
More file actions
260 lines (214 loc) · 10.7 KB
/
sync_example_json.py
File metadata and controls
260 lines (214 loc) · 10.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
"""
Synchronous Ingestion Example - JSON Mode
This example demonstrates record ingestion using the synchronous API with JSON serialization.
Record Type Mode: JSON
- Records are sent as JSON-encoded strings
- Uses RecordType.JSON to specify JSON serialization
- Best for dynamic schemas or when working with JSON data
Authentication:
- Uses OAuth 2.0 Client Credentials (standard method)
- Includes example of custom headers provider for advanced use cases
Note: Both sync and async APIs provide the same throughput and durability guarantees.
Choose based on your application's architecture, not performance requirements.
"""
import json
import logging
import os
import time
from zerobus.sdk.shared import RecordType, StreamConfigurationOptions, TableProperties
from zerobus.sdk.shared.headers_provider import HeadersProvider
from zerobus.sdk.sync import ZerobusSdk
# Configure logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)
# Configuration - update these with your values
# For AWS:
SERVER_ENDPOINT = os.getenv("ZEROBUS_SERVER_ENDPOINT", "https://your-shard-id.zerobus.region.cloud.databricks.com")
UNITY_CATALOG_ENDPOINT = os.getenv("DATABRICKS_WORKSPACE_URL", "https://your-workspace.cloud.databricks.com")
# For Azure:
# SERVER_ENDPOINT = os.getenv(
# "ZEROBUS_SERVER_ENDPOINT", "https://your-shard-id.zerobus.region.azuredatabricks.net"
# )
# UNITY_CATALOG_ENDPOINT = os.getenv(
# "DATABRICKS_WORKSPACE_URL", "https://your-workspace.azuredatabricks.net"
# )
TABLE_NAME = os.getenv("ZEROBUS_TABLE_NAME", "catalog.schema.table")
# For OAuth authentication
CLIENT_ID = os.getenv("DATABRICKS_CLIENT_ID", "your-oauth-client-id")
CLIENT_SECRET = os.getenv("DATABRICKS_CLIENT_SECRET", "your-oauth-client-secret")
# Number of records to ingest
NUM_RECORDS = 100
def create_sample_json_record(index):
"""
Creates a sample AirQuality record as a dict.
With JSON mode, you can pass either a dict or a pre-serialized JSON string.
"""
return {
"device_name": f"sensor-{index % 10}",
"temp": 20 + (index % 15),
"humidity": 50 + (index % 40),
}
class CustomHeadersProvider(HeadersProvider):
"""
Example custom headers provider for advanced use cases.
Note: OAuth 2.0 Client Credentials (via create_stream()) is the standard
authentication method. Use this only if you have specific requirements
for custom headers (e.g., custom metadata, existing token management, etc.).
"""
def __init__(self, custom_token: str):
self.custom_token = custom_token
def get_headers(self):
"""
Return custom headers for gRPC metadata.
Returns:
List of (header_name, header_value) tuples
"""
return [
("authorization", f"Bearer {self.custom_token}"),
("x-custom-header", "custom-value"),
]
def main():
print("Starting synchronous ingestion example (Explicit JSON Mode)...")
print("=" * 60)
# Check if credentials are configured
if CLIENT_ID == "your-oauth-client-id" or CLIENT_SECRET == "your-oauth-client-secret":
logger.error("Please set DATABRICKS_CLIENT_ID and DATABRICKS_CLIENT_SECRET environment variables")
logger.info("Or update the CLIENT_ID and CLIENT_SECRET values in this file")
return
if SERVER_ENDPOINT == "https://your-shard-id.zerobus.region.cloud.databricks.com":
logger.error("Please set ZEROBUS_SERVER_ENDPOINT environment variable")
logger.info("Or update the SERVER_ENDPOINT value in this file")
return
if TABLE_NAME == "catalog.schema.table":
logger.error("Please set ZEROBUS_TABLE_NAME environment variable")
logger.info("Or update the TABLE_NAME value in this file")
return
try:
# Step 1: Initialize the SDK
sdk = ZerobusSdk(SERVER_ENDPOINT, UNITY_CATALOG_ENDPOINT)
logger.info("✓ SDK initialized")
# Step 2: Define table properties
# Note: No protobuf descriptor needed for JSON mode
table_properties = TableProperties(TABLE_NAME)
logger.info(f"✓ Table properties configured for: {TABLE_NAME} (JSON mode)")
# Step 3: Create stream configuration with JSON record type
options = StreamConfigurationOptions(
record_type=RecordType.JSON,
max_inflight_records=1000,
recovery=True,
recovery_timeout_ms=15000,
recovery_backoff_ms=2000,
recovery_retries=3,
)
logger.info("✓ Stream configuration created")
# Step 4: Create a stream with OAuth 2.0 authentication
#
# The SDK automatically:
# - Includes authorization header with OAuth token
# - Includes x-databricks-zerobus-table-name header
stream = sdk.create_stream(CLIENT_ID, CLIENT_SECRET, table_properties, options)
# Advanced: Custom headers provider (for special use cases only)
# Uncomment to use custom headers instead of OAuth:
# custom_provider = CustomHeadersProvider(custom_token="your-custom-token")
# stream = sdk.create_stream(
# CLIENT_ID, CLIENT_SECRET, table_properties, options,
# headers_provider=custom_provider
# )
# Step 5: Ingest JSON records synchronously
logger.info(f"\nDemonstrating different ingestion methods...")
start_time = time.time()
success_count = 0
try:
# ========================================================================
# Method 1: ingest_record_offset() - RECOMMENDED for single records
# Returns offset directly without intermediate acknowledgment object
# ========================================================================
logger.info("\n1. Using ingest_record_offset() - optimized API")
for i in range(min(10, NUM_RECORDS)):
record_dict = create_sample_json_record(i)
# Returns offset directly - more efficient than ingest_record()
offset = stream.ingest_record_offset(record_dict)
logger.info(f" Record {i} ingested at offset {offset}")
success_count += 1
# ========================================================================
# Method 2: ingest_records_offset() - RECOMMENDED for batch ingestion
# Ingests multiple records and returns one offset for the whole batch
# ========================================================================
logger.info("\n2. Using ingest_records_offset() - batch API")
batch_size = 20
for batch_num in range(2):
batch = []
for i in range(batch_size):
idx = 10 + batch_num * batch_size + i
if idx >= NUM_RECORDS:
break
record_dict = create_sample_json_record(idx)
# Can mix dicts and JSON strings in the same batch
if i % 2 == 0:
batch.append(record_dict)
else:
batch.append(json.dumps(record_dict))
if batch:
# Returns one offset for the whole batch
batch_offset = stream.ingest_records_offset(batch)
logger.info(f" Batch {batch_num + 1}: {len(batch)} records, offset: {batch_offset}")
success_count += len(batch)
# ========================================================================
# Method 3: ingest_record_nowait() - Maximum throughput (fire-and-forget)
# Use when you don't need individual offsets and want maximum speed
# ========================================================================
logger.info("\n3. Using ingest_record_nowait() - fire-and-forget")
remaining = NUM_RECORDS - success_count
if remaining > 0:
for i in range(min(10, remaining)):
idx = success_count + i
record_dict = create_sample_json_record(idx)
stream.ingest_record_nowait(record_dict)
logger.info(f" Queued {min(10, remaining)} records (no wait for ack)")
success_count += min(10, remaining)
# ========================================================================
# Method 4: ingest_records_nowait() - Batch fire-and-forget
# Combines batch efficiency with fire-and-forget speed
# ========================================================================
logger.info("\n4. Using ingest_records_nowait() - batch fire-and-forget")
remaining = NUM_RECORDS - success_count
if remaining > 0:
batch = [create_sample_json_record(success_count + i) for i in range(remaining)]
stream.ingest_records_nowait(batch)
logger.info(f" Queued {len(batch)} records in batch (no wait for ack)")
success_count += len(batch)
# ========================================================================
# Legacy Method: ingest_record() - DEPRECATED
# Returns acknowledgment object that needs separate wait_for_ack() call
# Use ingest_record_offset() instead
# ========================================================================
# ack = stream.ingest_record(record_dict) # Deprecated
# offset = ack.wait_for_ack() # Extra step needed
end_time = time.time()
duration_seconds = end_time - start_time
records_per_second = NUM_RECORDS / duration_seconds
# Step 6: Flush and close the stream
logger.info("\nFlushing stream...")
stream.flush()
logger.info("✓ Stream flushed")
stream.close()
logger.info("✓ Stream closed")
# Print summary
print("\n" + "=" * 60)
print("Ingestion Summary:")
print(f" Total records: {NUM_RECORDS}")
print(f" Successful: {success_count}")
print(f" Failed: {NUM_RECORDS - success_count}")
print(f" Duration: {duration_seconds:.2f} seconds")
print(f" Throughput: {records_per_second:.2f} records/sec")
print(" Record type: JSON (explicit)")
print("=" * 60)
except Exception as e:
logger.error(f"\n✗ Error during ingestion: {e}")
stream.close()
raise
except Exception as e:
logger.error(f"\n✗ Failed to initialize stream: {e}")
raise
if __name__ == "__main__":
main()