[AURON #2093] Introduce kafka mock source#2106
[AURON #2093] Introduce kafka mock source#2106Tartarus0zm wants to merge 4 commits intoapache:masterfrom
Conversation
|
@richox @merrily01 hello, CI is Green, PTAL |
There was a problem hiding this comment.
Pull request overview
This PR introduces a Kafka “mock source” path so Flink SQL jobs can be tested without a real Kafka cluster by injecting predefined records through table options and routing execution to a native mock scan exec.
Changes:
- Added a native
KafkaMockScanExecand planner wiring to choose it whenmock_data_json_arrayis provided. - Extended the Kafka scan protobuf node to carry mock JSON payload.
- Added Flink runtime/table-factory support for
kafka.mock.dataplus planner IT coverage for event-time windowing queries.
Reviewed changes
Copilot reviewed 10 out of 10 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| native-engine/datafusion-ext-plans/src/flink/mod.rs | Exposes the new mock scan exec module. |
| native-engine/datafusion-ext-plans/src/flink/kafka_mock_scan_exec.rs | Implements mock-record generation from JSON into Arrow arrays for DataFusion execution. |
| native-engine/auron-planner/src/planner.rs | Switches between real Kafka scan and mock scan based on mock_data_json_array. |
| native-engine/auron-planner/proto/auron.proto | Adds mock_data_json_array field to KafkaScanExecNode. |
| auron-flink-extension/auron-flink-runtime/.../AuronKafkaSourceFunction.java | Passes mock data into the plan and derives partitions from mock JSON when present. |
| auron-flink-extension/auron-flink-runtime/.../AuronKafkaDynamicTableSource.java | Propagates mockData into the runtime source function. |
| auron-flink-extension/auron-flink-runtime/.../AuronKafkaDynamicTableFactory.java | Introduces kafka.mock.data option and passes it into the table source. |
| auron-flink-extension/auron-flink-planner/.../AuronKafkaSourceTestBase.java | Adds shared test setup creating a table using kafka.mock.data. |
| auron-flink-extension/auron-flink-planner/.../AuronKafkaSourceITCase.java | Adds windowing IT tests over the mock Kafka table. |
| auron-flink-extension/auron-flink-planner/pom.xml | Adds Hadoop test dependencies needed by planner tests. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| } | ||
|
|
||
| fn statistics(&self) -> Result<Statistics> { | ||
| todo!() |
There was a problem hiding this comment.
statistics() is left as todo!(), which will panic if DataFusion requests plan statistics (e.g., for EXPLAIN, optimizations, or some execution paths). Please return an appropriate Statistics value (often “unknown”/default) instead of panicking.
| todo!() | |
| Ok(Statistics::default()) |
| build_typed_array!(StringBuilder, |v: &sonic_rs::Value| v | ||
| .as_str() | ||
| .map(|s| s.to_string())) | ||
| } | ||
| DataType::LargeUtf8 => { | ||
| build_typed_array!(LargeStringBuilder, |v: &sonic_rs::Value| v | ||
| .as_str() | ||
| .map(|s| s.to_string())) |
There was a problem hiding this comment.
For Utf8/LargeUtf8, the extractor currently builds an owned String (s.to_string()), but StringBuilder/LargeStringBuilder in this codebase are used with append_value(&str) (see existing usages). This is likely to fail to compile due to type mismatch. Prefer extracting Option<&str> (via as_str()) and appending that, only allocating when truly needed.
| build_typed_array!(StringBuilder, |v: &sonic_rs::Value| v | |
| .as_str() | |
| .map(|s| s.to_string())) | |
| } | |
| DataType::LargeUtf8 => { | |
| build_typed_array!(LargeStringBuilder, |v: &sonic_rs::Value| v | |
| .as_str() | |
| .map(|s| s.to_string())) | |
| build_typed_array!(StringBuilder, |v: &sonic_rs::Value| v.as_str()) | |
| } | |
| DataType::LargeUtf8 => { | |
| build_typed_array!(LargeStringBuilder, |v: &sonic_rs::Value| v.as_str()) |
| DataType::Int8 => { | ||
| build_typed_array!(Int8Builder, |v: &sonic_rs::Value| v | ||
| .as_i64() | ||
| .map(|n| n as i8)) | ||
| } |
There was a problem hiding this comment.
The integer conversions use as casts (e.g., n as i8). For out-of-range JSON values this silently truncates/wraps, producing incorrect mock data without an error. Use checked conversions (e.g., i8::try_from(n).ok(), u8::try_from(n).ok()) and return a type-mismatch/range error when conversion fails.
| tableOptions.get(BUFFER_SIZE), | ||
| tableOptions.get(START_UP_MODE)); | ||
| tableOptions.get(START_UP_MODE), | ||
| tableOptions.get(KAFKA_MOCK_DATA)); |
There was a problem hiding this comment.
KAFKA_MOCK_DATA is read via tableOptions.get(KAFKA_MOCK_DATA). Because the option has noDefaultValue(), this makes the new option effectively required and will break existing tables that don't set it. Please use getOptional(KAFKA_MOCK_DATA).orElse(null) (or similar) and keep the downstream code path null/empty-safe.
| tableOptions.get(KAFKA_MOCK_DATA)); | |
| tableOptions.getOptional(KAFKA_MOCK_DATA).orElse(null)); |
| for (JsonNode data : mockDataJson) { | ||
| int partition = data.get("serialized_kafka_records_partition").asInt(); |
There was a problem hiding this comment.
In mock-data mode, mapper.readTree(mockData) is assumed to be a JSON array of objects each containing serialized_kafka_records_partition. If mockData is not an array, or if an element lacks that field, data.get(...).asInt() can throw/produce incorrect defaults. Please validate mockDataJson.isArray() and that each element has the expected fields, and fail fast with a clear error message.
| for (JsonNode data : mockDataJson) { | |
| int partition = data.get("serialized_kafka_records_partition").asInt(); | |
| if (!mockDataJson.isArray()) { | |
| throw new IllegalArgumentException( | |
| "Mock data for auron kafka source must be a JSON array of objects, but was: " | |
| + mockDataJson.getNodeType()); | |
| } | |
| for (int i = 0; i < mockDataJson.size(); i++) { | |
| JsonNode data = mockDataJson.get(i); | |
| if (data == null || !data.isObject()) { | |
| throw new IllegalArgumentException( | |
| "Each element in mock data array must be a JSON object; invalid element at index " | |
| + i | |
| + " with type: " | |
| + (data == null ? "null" : data.getNodeType())); | |
| } | |
| JsonNode partitionNode = data.get("serialized_kafka_records_partition"); | |
| if (partitionNode == null || !partitionNode.isInt()) { | |
| throw new IllegalArgumentException( | |
| "Mock data element at index " | |
| + i | |
| + " must contain integer field 'serialized_kafka_records_partition'"); | |
| } | |
| int partition = partitionNode.intValue(); |
| assignedPartitions.add(partition); | ||
| } | ||
| } | ||
| LOG.info("Use mock data for auron kafka source, partition size = {}", assignedPartitions); |
There was a problem hiding this comment.
The log message says partition size = {} but logs the full assignedPartitions list. Either log assignedPartitions.size() or adjust the message to reflect that it prints the partition list.
| LOG.info("Use mock data for auron kafka source, partition size = {}", assignedPartitions); | |
| LOG.info("Use mock data for auron kafka source, partition size = {}", assignedPartitions.size()); |
| } | ||
|
|
||
| public void setMockData(String mockData) { | ||
| Preconditions.checkArgument(mockData != null, "Auron kafka source mock data must not null"); |
There was a problem hiding this comment.
setMockData precondition message is grammatically incorrect: "must not null". Please change it to something like "must not be null" (or include the option key name to make failures easier to diagnose).
| Preconditions.checkArgument(mockData != null, "Auron kafka source mock data must not null"); | |
| Preconditions.checkArgument(mockData != null, "Auron kafka source mock data must not be null"); |
| Boundedness::Unbounded { | ||
| requires_infinite_memory: false, | ||
| }, |
There was a problem hiding this comment.
properties() marks this exec as Boundedness::Unbounded, but mock_records() emits a finite set of batches and then completes. This mismatch can affect planning and streaming semantics. Consider using Boundedness::Bounded (or otherwise matching the actual runtime behavior).
| Boundedness::Unbounded { | |
| requires_infinite_memory: false, | |
| }, | |
| Boundedness::Bounded, |
Which issue does this PR close?
Closes #2093
Rationale for this change
What changes are included in this PR?
Are there any user-facing changes?
How was this patch tested?