Skip to content

[AURON #2093] Introduce kafka mock source#2106

Open
Tartarus0zm wants to merge 4 commits intoapache:masterfrom
Tartarus0zm:flink-mock-kafka-source
Open

[AURON #2093] Introduce kafka mock source#2106
Tartarus0zm wants to merge 4 commits intoapache:masterfrom
Tartarus0zm:flink-mock-kafka-source

Conversation

@Tartarus0zm
Copy link
Contributor

Which issue does this PR close?

Closes #2093

Rationale for this change

  • Since we don’t have a Kafka environment, testing is difficult; therefore, we’ve introduced a simulated Kafka source that allows us to specify the data to be sent, thereby achieving our testing objectives.

What changes are included in this PR?

  • add kafka_mock_scan_exec to send mock data
  • add support for specifying mock data in the Kafka Table Factory
  • add test AuronKafkaSourceITCase and AuronKafkaSourceTestBase

Are there any user-facing changes?

  • No

How was this patch tested?

  • test via UT

@Tartarus0zm Tartarus0zm requested a review from richox March 20, 2026 03:39
@Tartarus0zm Tartarus0zm requested a review from merrily01 March 20, 2026 04:58
@Tartarus0zm
Copy link
Contributor Author

@richox @merrily01 hello, CI is Green, PTAL

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces a Kafka “mock source” path so Flink SQL jobs can be tested without a real Kafka cluster by injecting predefined records through table options and routing execution to a native mock scan exec.

Changes:

  • Added a native KafkaMockScanExec and planner wiring to choose it when mock_data_json_array is provided.
  • Extended the Kafka scan protobuf node to carry mock JSON payload.
  • Added Flink runtime/table-factory support for kafka.mock.data plus planner IT coverage for event-time windowing queries.

Reviewed changes

Copilot reviewed 10 out of 10 changed files in this pull request and generated 8 comments.

Show a summary per file
File Description
native-engine/datafusion-ext-plans/src/flink/mod.rs Exposes the new mock scan exec module.
native-engine/datafusion-ext-plans/src/flink/kafka_mock_scan_exec.rs Implements mock-record generation from JSON into Arrow arrays for DataFusion execution.
native-engine/auron-planner/src/planner.rs Switches between real Kafka scan and mock scan based on mock_data_json_array.
native-engine/auron-planner/proto/auron.proto Adds mock_data_json_array field to KafkaScanExecNode.
auron-flink-extension/auron-flink-runtime/.../AuronKafkaSourceFunction.java Passes mock data into the plan and derives partitions from mock JSON when present.
auron-flink-extension/auron-flink-runtime/.../AuronKafkaDynamicTableSource.java Propagates mockData into the runtime source function.
auron-flink-extension/auron-flink-runtime/.../AuronKafkaDynamicTableFactory.java Introduces kafka.mock.data option and passes it into the table source.
auron-flink-extension/auron-flink-planner/.../AuronKafkaSourceTestBase.java Adds shared test setup creating a table using kafka.mock.data.
auron-flink-extension/auron-flink-planner/.../AuronKafkaSourceITCase.java Adds windowing IT tests over the mock Kafka table.
auron-flink-extension/auron-flink-planner/pom.xml Adds Hadoop test dependencies needed by planner tests.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

}

fn statistics(&self) -> Result<Statistics> {
todo!()
Copy link

Copilot AI Mar 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

statistics() is left as todo!(), which will panic if DataFusion requests plan statistics (e.g., for EXPLAIN, optimizations, or some execution paths). Please return an appropriate Statistics value (often “unknown”/default) instead of panicking.

Suggested change
todo!()
Ok(Statistics::default())

Copilot uses AI. Check for mistakes.
Comment on lines +252 to +259
build_typed_array!(StringBuilder, |v: &sonic_rs::Value| v
.as_str()
.map(|s| s.to_string()))
}
DataType::LargeUtf8 => {
build_typed_array!(LargeStringBuilder, |v: &sonic_rs::Value| v
.as_str()
.map(|s| s.to_string()))
Copy link

Copilot AI Mar 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For Utf8/LargeUtf8, the extractor currently builds an owned String (s.to_string()), but StringBuilder/LargeStringBuilder in this codebase are used with append_value(&str) (see existing usages). This is likely to fail to compile due to type mismatch. Prefer extracting Option<&str> (via as_str()) and appending that, only allocating when truly needed.

Suggested change
build_typed_array!(StringBuilder, |v: &sonic_rs::Value| v
.as_str()
.map(|s| s.to_string()))
}
DataType::LargeUtf8 => {
build_typed_array!(LargeStringBuilder, |v: &sonic_rs::Value| v
.as_str()
.map(|s| s.to_string()))
build_typed_array!(StringBuilder, |v: &sonic_rs::Value| v.as_str())
}
DataType::LargeUtf8 => {
build_typed_array!(LargeStringBuilder, |v: &sonic_rs::Value| v.as_str())

Copilot uses AI. Check for mistakes.
Comment on lines +207 to +211
DataType::Int8 => {
build_typed_array!(Int8Builder, |v: &sonic_rs::Value| v
.as_i64()
.map(|n| n as i8))
}
Copy link

Copilot AI Mar 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The integer conversions use as casts (e.g., n as i8). For out-of-range JSON values this silently truncates/wraps, producing incorrect mock data without an error. Use checked conversions (e.g., i8::try_from(n).ok(), u8::try_from(n).ok()) and return a type-mismatch/range error when conversion fails.

Copilot uses AI. Check for mistakes.
tableOptions.get(BUFFER_SIZE),
tableOptions.get(START_UP_MODE));
tableOptions.get(START_UP_MODE),
tableOptions.get(KAFKA_MOCK_DATA));
Copy link

Copilot AI Mar 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

KAFKA_MOCK_DATA is read via tableOptions.get(KAFKA_MOCK_DATA). Because the option has noDefaultValue(), this makes the new option effectively required and will break existing tables that don't set it. Please use getOptional(KAFKA_MOCK_DATA).orElse(null) (or similar) and keep the downstream code path null/empty-safe.

Suggested change
tableOptions.get(KAFKA_MOCK_DATA));
tableOptions.getOptional(KAFKA_MOCK_DATA).orElse(null));

Copilot uses AI. Check for mistakes.
Comment on lines +187 to +188
for (JsonNode data : mockDataJson) {
int partition = data.get("serialized_kafka_records_partition").asInt();
Copy link

Copilot AI Mar 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In mock-data mode, mapper.readTree(mockData) is assumed to be a JSON array of objects each containing serialized_kafka_records_partition. If mockData is not an array, or if an element lacks that field, data.get(...).asInt() can throw/produce incorrect defaults. Please validate mockDataJson.isArray() and that each element has the expected fields, and fail fast with a clear error message.

Suggested change
for (JsonNode data : mockDataJson) {
int partition = data.get("serialized_kafka_records_partition").asInt();
if (!mockDataJson.isArray()) {
throw new IllegalArgumentException(
"Mock data for auron kafka source must be a JSON array of objects, but was: "
+ mockDataJson.getNodeType());
}
for (int i = 0; i < mockDataJson.size(); i++) {
JsonNode data = mockDataJson.get(i);
if (data == null || !data.isObject()) {
throw new IllegalArgumentException(
"Each element in mock data array must be a JSON object; invalid element at index "
+ i
+ " with type: "
+ (data == null ? "null" : data.getNodeType()));
}
JsonNode partitionNode = data.get("serialized_kafka_records_partition");
if (partitionNode == null || !partitionNode.isInt()) {
throw new IllegalArgumentException(
"Mock data element at index "
+ i
+ " must contain integer field 'serialized_kafka_records_partition'");
}
int partition = partitionNode.intValue();

Copilot uses AI. Check for mistakes.
assignedPartitions.add(partition);
}
}
LOG.info("Use mock data for auron kafka source, partition size = {}", assignedPartitions);
Copy link

Copilot AI Mar 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The log message says partition size = {} but logs the full assignedPartitions list. Either log assignedPartitions.size() or adjust the message to reflect that it prints the partition list.

Suggested change
LOG.info("Use mock data for auron kafka source, partition size = {}", assignedPartitions);
LOG.info("Use mock data for auron kafka source, partition size = {}", assignedPartitions.size());

Copilot uses AI. Check for mistakes.
}

public void setMockData(String mockData) {
Preconditions.checkArgument(mockData != null, "Auron kafka source mock data must not null");
Copy link

Copilot AI Mar 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

setMockData precondition message is grammatically incorrect: "must not null". Please change it to something like "must not be null" (or include the option key name to make failures easier to diagnose).

Suggested change
Preconditions.checkArgument(mockData != null, "Auron kafka source mock data must not null");
Preconditions.checkArgument(mockData != null, "Auron kafka source mock data must not be null");

Copilot uses AI. Check for mistakes.
Comment on lines +99 to +101
Boundedness::Unbounded {
requires_infinite_memory: false,
},
Copy link

Copilot AI Mar 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

properties() marks this exec as Boundedness::Unbounded, but mock_records() emits a finite set of batches and then completes. This mismatch can affect planning and streaming semantics. Consider using Boundedness::Bounded (or otherwise matching the actual runtime behavior).

Suggested change
Boundedness::Unbounded {
requires_infinite_memory: false,
},
Boundedness::Bounded,

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Introduce kafka mock source

2 participants