|
26 | 26 | - Using ReadMaxRows to limit data consumption per micro-batch |
27 | 27 | - Implementing getDefaultReadLimit() for default rate limiting |
28 | 28 | - Using reportLatestOffset() for monitoring |
| 29 | +- Comparing behavior between default trigger and Trigger.AvailableNow |
29 | 30 |
|
30 | | -The data source generates sequential integers, but admission control limits |
31 | | -each micro-batch to only 2 rows, demonstrating how streaming sources can |
32 | | -throttle data consumption. |
| 31 | +The example runs TWO streaming queries to demonstrate the difference: |
| 32 | +
|
| 33 | +1. **Default Trigger**: Each micro-batch processes only 2 rows (limited by getDefaultReadLimit) |
| 34 | + - Runs continuously until manually stopped |
| 35 | + - Demonstrates throttled data consumption |
| 36 | +
|
| 37 | +2. **Trigger.AvailableNow**: Processes ALL available data then stops automatically |
| 38 | + - latestOffset receives ReadAllAvailable instead of ReadMaxRows |
| 39 | + - Ignores the default read limit per SupportsAdmissionControl contract |
| 40 | + - Useful for one-time catch-up processing |
33 | 41 |
|
34 | 42 | Usage: |
35 | 43 | bin/spark-submit examples/src/main/python/sql/streaming/\\ |
36 | 44 | python_datasource_admission_control.py |
37 | 45 |
|
38 | | -Expected output shows batches limited to 2 rows each: |
39 | | - Batch 0: 2 rows (0, 1) |
40 | | - Batch 1: 2 rows (2, 3) |
41 | | - ... |
| 46 | +Expected output: |
| 47 | + DEFAULT TRIGGER - Batches limited to 2 rows each: |
| 48 | + Batch 0: 2 rows (0, 1) |
| 49 | + Batch 1: 2 rows (2, 3) |
| 50 | + ... |
| 51 | +
|
| 52 | + TRIGGER.AVAILABLENOW - Processes all 10 rows then stops: |
| 53 | + Batch 0: 10 rows (0-9) |
| 54 | + Query stopped automatically |
42 | 55 | """ |
43 | 56 |
|
44 | 57 | import time |
@@ -66,8 +79,8 @@ class AdmissionControlStreamReader(DataSourceStreamReader): |
66 | 79 | allowing Spark to control how much data is consumed per micro-batch. |
67 | 80 | """ |
68 | 81 |
|
69 | | - # Simulated "total available" data - in practice this could be unbounded |
70 | | - TOTAL_AVAILABLE = 1000000 |
| 82 | + # Simulated "total available" data - kept small for demo purposes |
| 83 | + TOTAL_AVAILABLE = 10 |
71 | 84 |
|
72 | 85 | def initialOffset(self) -> dict: |
73 | 86 | """Return the starting offset for new queries.""" |
@@ -168,50 +181,132 @@ def streamReader(self, schema: StructType) -> DataSourceStreamReader: |
168 | 181 | return AdmissionControlStreamReader() |
169 | 182 |
|
170 | 183 |
|
171 | | -def main() -> None: |
172 | | - """Run the admission control streaming example.""" |
173 | | - spark: SparkSession = SparkSession.builder.appName("AdmissionControlExample").getOrCreate() |
| 184 | +def run_with_default_trigger(spark: SparkSession) -> None: |
| 185 | + """ |
| 186 | + Demonstrate admission control with DEFAULT trigger. |
174 | 187 |
|
175 | | - # Register the custom data source |
176 | | - spark.dataSource.register(AdmissionControlDataSource) |
| 188 | + With default trigger, getDefaultReadLimit() returns ReadMaxRows(2), |
| 189 | + so each micro-batch processes only 2 rows at a time. |
| 190 | + """ |
| 191 | + print("\n" + "=" * 70) |
| 192 | + print("EXAMPLE 1: DEFAULT TRIGGER (Continuous Processing)") |
| 193 | + print("=" * 70) |
| 194 | + print("Expected behavior: Each batch processes max 2 rows (from getDefaultReadLimit)") |
| 195 | + print() |
177 | 196 |
|
178 | | - # Create streaming DataFrame from our custom source |
179 | 197 | df = spark.readStream.format("admission_control_example").load() |
180 | 198 |
|
181 | | - # Track batch statistics |
182 | 199 | batch_stats: List[Dict[str, Any]] = [] |
183 | 200 |
|
184 | 201 | def process_batch(batch_df: DataFrame, batch_id: int) -> None: |
185 | | - """Process each micro-batch and track statistics.""" |
186 | 202 | count = batch_df.count() |
187 | 203 | rows = [row.asDict() for row in batch_df.collect()] |
188 | | - batch_stats.append({"batch_id": batch_id, "count": count, "rows": rows}) |
| 204 | + batch_stats.append({"batch_id": batch_id, "count": count}) |
189 | 205 |
|
190 | | - print(f"Batch {batch_id}: {count} rows processed") |
| 206 | + print(f" Batch {batch_id}: {count} rows") |
191 | 207 | if rows: |
192 | 208 | ids = [r["id"] for r in rows] |
193 | | - print(f" IDs: {ids}") |
| 209 | + print(f" IDs: {ids}") |
194 | 210 |
|
195 | | - # Start streaming query with foreachBatch |
| 211 | + # Use default trigger - will respect getDefaultReadLimit() |
196 | 212 | query = df.writeStream.foreachBatch(process_batch).start() |
197 | 213 |
|
198 | | - # Wait for a few batches to complete |
199 | 214 | try: |
200 | | - # Process batches for a short time to demonstrate admission control |
201 | | - time.sleep(5) |
| 215 | + # Let it run for a few batches |
| 216 | + time.sleep(3) |
202 | 217 | finally: |
203 | 218 | query.stop() |
204 | 219 | query.awaitTermination() |
205 | 220 |
|
206 | | - # Print summary |
207 | | - print("\n--- Admission Control Summary ---") |
208 | | - print(f"Total batches processed: {len(batch_stats)}") |
| 221 | + print("\n--- Default Trigger Summary ---") |
| 222 | + print(f"Total batches: {len(batch_stats)}") |
209 | 223 | for stat in batch_stats: |
210 | 224 | print(f" Batch {stat['batch_id']}: {stat['count']} rows") |
211 | 225 |
|
212 | 226 | if batch_stats: |
213 | 227 | row_counts = [s["count"] for s in batch_stats] |
214 | | - print(f"\nAll batches limited to max 2 rows: {all(c <= 2 for c in row_counts)}") |
| 228 | + all_limited = all(c <= 2 for c in row_counts) |
| 229 | + print(f"\nAll batches limited to 2 rows: {all_limited}") |
| 230 | + print("✓ Admission control working - each batch throttled to 2 rows") |
| 231 | + |
| 232 | + |
| 233 | +def run_with_trigger_available_now(spark: SparkSession) -> None: |
| 234 | + """ |
| 235 | + Demonstrate admission control with TRIGGER.AVAILABLENOW. |
| 236 | +
|
| 237 | + With Trigger.AvailableNow, latestOffset receives ReadAllAvailable, |
| 238 | + which tells the source to process ALL available data and then stop. |
| 239 | + This ignores getDefaultReadLimit() per the SupportsAdmissionControl contract. |
| 240 | + """ |
| 241 | + print("\n" + "=" * 70) |
| 242 | + print("EXAMPLE 2: TRIGGER.AVAILABLENOW (Finite Processing)") |
| 243 | + print("=" * 70) |
| 244 | + print("Expected behavior: Process ALL 10 rows in one batch, then stop") |
| 245 | + print() |
| 246 | + |
| 247 | + df = spark.readStream.format("admission_control_example").load() |
| 248 | + |
| 249 | + batch_stats: List[Dict[str, Any]] = [] |
| 250 | + |
| 251 | + def process_batch(batch_df: DataFrame, batch_id: int) -> None: |
| 252 | + count = batch_df.count() |
| 253 | + rows = [row.asDict() for row in batch_df.collect()] |
| 254 | + batch_stats.append({"batch_id": batch_id, "count": count}) |
| 255 | + |
| 256 | + print(f" Batch {batch_id}: {count} rows") |
| 257 | + if rows: |
| 258 | + ids = [r["id"] for r in rows] |
| 259 | + print(f" IDs: {ids}") |
| 260 | + |
| 261 | + # Use Trigger.AvailableNow - will receive ReadAllAvailable |
| 262 | + query = df.writeStream \ |
| 263 | + .trigger(availableNow=True) \ |
| 264 | + .foreachBatch(process_batch) \ |
| 265 | + .start() |
| 266 | + |
| 267 | + # Wait for completion - AvailableNow stops automatically |
| 268 | + query.awaitTermination() |
| 269 | + |
| 270 | + print("\n--- Trigger.AvailableNow Summary ---") |
| 271 | + print(f"Total batches: {len(batch_stats)}") |
| 272 | + for stat in batch_stats: |
| 273 | + print(f" Batch {stat['batch_id']}: {stat['count']} rows") |
| 274 | + |
| 275 | + total_rows = sum(s["count"] for s in batch_stats) |
| 276 | + print(f"\nTotal rows processed: {total_rows}") |
| 277 | + print("✓ Processed all available data (10 rows) then stopped automatically") |
| 278 | + print("✓ Ignored getDefaultReadLimit() as required by SupportsAdmissionControl") |
| 279 | + |
| 280 | + |
| 281 | +def main() -> None: |
| 282 | + """Run admission control examples with both trigger types.""" |
| 283 | + spark: SparkSession = SparkSession.builder \ |
| 284 | + .appName("AdmissionControlExample") \ |
| 285 | + .getOrCreate() |
| 286 | + |
| 287 | + # Register the custom data source |
| 288 | + spark.dataSource.register(AdmissionControlDataSource) |
| 289 | + |
| 290 | + print("\n" + "=" * 70) |
| 291 | + print("ADMISSION CONTROL AND TRIGGER.AVAILABLENOW DEMONSTRATION") |
| 292 | + print("=" * 70) |
| 293 | + print("\nThis example demonstrates how admission control interacts with") |
| 294 | + print("different trigger types in PySpark streaming data sources.") |
| 295 | + print("\nData Source: 10 sequential integers (0-9)") |
| 296 | + print("Default Read Limit: ReadMaxRows(2)") |
| 297 | + |
| 298 | + # Run both examples |
| 299 | + run_with_default_trigger(spark) |
| 300 | + run_with_trigger_available_now(spark) |
| 301 | + |
| 302 | + print("\n" + "=" * 70) |
| 303 | + print("KEY TAKEAWAYS") |
| 304 | + print("=" * 70) |
| 305 | + print("1. DEFAULT TRIGGER: Respects getDefaultReadLimit() - processes 2 rows/batch") |
| 306 | + print("2. TRIGGER.AVAILABLENOW: Receives ReadAllAvailable - processes ALL data") |
| 307 | + print("3. latestOffset(start, limit) receives different ReadLimit types") |
| 308 | + print("4. Sources can throttle continuous streams while supporting batch catch-up") |
| 309 | + print("=" * 70) |
215 | 310 |
|
216 | 311 | spark.stop() |
217 | 312 |
|
|
0 commit comments