Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions auron-flink-extension/auron-flink-planner/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,19 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client-api</artifactId>
<version>${hadoopVersion}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client-runtime</artifactId>
<version>${hadoopVersion}</version>
<scope>test</scope>
</dependency>

<dependency>
<!-- For using the filesystem connector in tests -->
<groupId>org.apache.flink</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.auron.flink.table.kafka;

import static org.assertj.core.api.Assertions.assertThat;

import java.time.LocalDateTime;
import java.util.List;
import org.apache.flink.types.Row;
import org.apache.flink.util.CollectionUtil;
import org.junit.jupiter.api.Test;

/**
* IT case for Auron Flink Kafka Source.
*/
public class AuronKafkaSourceITCase extends AuronKafkaSourceTestBase {

@Test
public void testEventTimeTumbleTvfWindow() {
environment.setParallelism(1);
List<Row> rows = CollectionUtil.iteratorToList(tableEnvironment
.executeSql(
"SELECT `name`, count(1), window_start FROM TABLE("
+ "TUMBLE(TABLE T2, DESCRIPTOR(`ts`), INTERVAL '1' MINUTE)) GROUP BY `name`, window_start, window_end")
.collect());
assertThat(rows.size()).isEqualTo(3);
assertRowsContains(
rows,
new Object[] {"zm1", 1L, LocalDateTime.parse("2026-03-16T12:03:00")},
new Object[] {"zm2", 1L, LocalDateTime.parse("2026-03-16T12:03:00")},
new Object[] {"zm1", 1L, LocalDateTime.parse("2026-03-16T12:05:00")});
}

@Test
public void testEventTimeTumbleGroupWindow() {
environment.setParallelism(1);
List<Row> rows = CollectionUtil.iteratorToList(tableEnvironment
.executeSql("SELECT `name`, count(1), TUMBLE_START(`ts`, INTERVAL '1' MINUTE) "
+ "FROM T2 group by TUMBLE(`ts`, INTERVAL '1' MINUTE), `name`")
.collect());
assertThat(rows.size()).isEqualTo(3);
assertRowsContains(
rows,
new Object[] {"zm1", 1L, LocalDateTime.parse("2026-03-16T12:03:00")},
new Object[] {"zm2", 1L, LocalDateTime.parse("2026-03-16T12:03:00")},
new Object[] {"zm1", 1L, LocalDateTime.parse("2026-03-16T12:05:00")});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.auron.flink.table.kafka;

import static org.assertj.core.api.Assertions.assertThat;

import java.util.Arrays;
import java.util.List;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.TestInstance;

/**
* Base class for Auron Flink Kafka Table Tests.
*/
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class AuronKafkaSourceTestBase {
protected StreamExecutionEnvironment environment;
protected StreamTableEnvironment tableEnvironment;

@BeforeAll
public void before() {
Configuration configuration = new Configuration();
// TODO Resolving the issue where the Flink classloader is closed and CompileUtils.doCompile fails
configuration.setString("classloader.check-leaked-classloader", "false");
// set time zone to UTC
configuration.setString("table.local-time-zone", "UTC");
configuration.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.STREAMING);
environment = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
environment.setRestartStrategy(RestartStrategies.noRestart());
environment.getConfig().setAutoWatermarkInterval(1);
tableEnvironment =
StreamTableEnvironment.create(environment, EnvironmentSettings.fromConfiguration(configuration));
String jsonArray = "["
+ "{\"serialized_kafka_records_partition\": 1, \"serialized_kafka_records_offset\": 100000, "
+ "\"serialized_kafka_records_timestamp\": 1773662603760, \"event_time\": 1773662603760, \"age\": 20, \"name\":\"zm1\"},"
+ "{\"serialized_kafka_records_partition\": 1, \"serialized_kafka_records_offset\": 100001, "
+ "\"serialized_kafka_records_timestamp\": 1773662603761, \"event_time\": 1773662633760, \"age\": 21, \"name\":\"zm2\"},"
+ "{\"serialized_kafka_records_partition\": 1, \"serialized_kafka_records_offset\": 100002, "
+ "\"serialized_kafka_records_timestamp\": 1773662603762, \"event_time\": 1773662703761, \"age\": 22, \"name\":\"zm1\"}"
+ "]";
tableEnvironment.executeSql(" CREATE TABLE T2 ( "
+ "\n `event_time` BIGINT, "
+ "\n `age` INT, "
+ "\n `name` STRING,"
+ "\n `ts` AS TO_TIMESTAMP(FROM_UNIXTIME(event_time / 1000)),"
+ "\n WATERMARK FOR `ts` AS `ts` "
+ "\n ) WITH ( "
+ "\n 'connector' = 'auron-kafka',"
+ "\n 'kafka.mock.data' = '" + jsonArray + "',"
+ "\n 'topic' = 'mock_topic',"
+ "\n 'properties.bootstrap.servers' = '127.0.0.1:9092',"
+ "\n 'properties.group.id' = 'flink-test-mock',"
+ "\n 'format' = 'JSON' "
+ "\n )");
}

protected void assertRowsContains(List<Row> actualRows, Object[]... expectedRows) {
for (Object[] expected : expectedRows) {
boolean found = actualRows.stream().anyMatch(row -> {
for (int i = 0; i < expected.length; i++) {
Object actual = row.getField(i);
if (!java.util.Objects.equals(expected[i], actual)) {
return false;
}
}
return true;
});
assertThat(found)
.as("Expected row %s not found in actual rows: %s", Arrays.toString(expected), actualRows)
.isTrue();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,12 @@ public class AuronKafkaDynamicTableFactory implements DynamicTableSourceFactory
.withDescription(
"offset mode for kafka source, support GROUP_OFFSET, LATEST, EARLIEST, TIMESTAMP will be supported.");

public static final ConfigOption<String> KAFKA_MOCK_DATA = ConfigOptions.key("kafka.mock.data")
.stringType()
.noDefaultValue()
.withDescription(
"When mock data generated, remember that the first three columns of each row are serialized_kafka_records_partition, serialized_kafka_records_offset, and serialized_kafka_records_timestamp.");

@Override
public DynamicTableSource createDynamicTableSource(Context context) {
final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
Expand All @@ -107,7 +113,8 @@ public DynamicTableSource createDynamicTableSource(Context context) {
format,
formatConfig,
tableOptions.get(BUFFER_SIZE),
tableOptions.get(START_UP_MODE));
tableOptions.get(START_UP_MODE),
tableOptions.get(KAFKA_MOCK_DATA));
Copy link

Copilot AI Mar 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

KAFKA_MOCK_DATA is read via tableOptions.get(KAFKA_MOCK_DATA). Because the option has noDefaultValue(), this makes the new option effectively required and will break existing tables that don't set it. Please use getOptional(KAFKA_MOCK_DATA).orElse(null) (or similar) and keep the downstream code path null/empty-safe.

Suggested change
tableOptions.get(KAFKA_MOCK_DATA));
tableOptions.getOptional(KAFKA_MOCK_DATA).orElse(null));

Copilot uses AI. Check for mistakes.
} catch (Exception e) {
throw new FlinkRuntimeException("Could not create Auron Kafka dynamic table source", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public class AuronKafkaDynamicTableSource implements ScanTableSource, SupportsWa
private final Map<String, String> formatConfig;
private final int bufferSize;
private final String startupMode;
private final String mockData;
/** Watermark strategy that is used to generate per-partition watermark. */
protected @Nullable WatermarkStrategy<RowData> watermarkStrategy;

Expand All @@ -57,7 +58,8 @@ public AuronKafkaDynamicTableSource(
String format,
Map<String, String> formatConfig,
int bufferSize,
String startupMode) {
String startupMode,
String mockData) {
final LogicalType physicalType = physicalDataType.getLogicalType();
Preconditions.checkArgument(physicalType.is(LogicalTypeRoot.ROW), "Row data type expected.");
this.physicalDataType = physicalDataType;
Expand All @@ -67,6 +69,7 @@ public AuronKafkaDynamicTableSource(
this.formatConfig = formatConfig;
this.bufferSize = bufferSize;
this.startupMode = startupMode;
this.mockData = mockData;
}

@Override
Expand All @@ -91,6 +94,10 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
sourceFunction.setWatermarkStrategy(watermarkStrategy);
}

if (mockData != null) {
sourceFunction.setMockData(mockData);
}

return new DataStreamScanProvider() {

@Override
Expand All @@ -109,7 +116,7 @@ public boolean isBounded() {
@Override
public DynamicTableSource copy() {
return new AuronKafkaDynamicTableSource(
physicalDataType, kafkaTopic, kafkaProperties, format, formatConfig, bufferSize, startupMode);
physicalDataType, kafkaTopic, kafkaProperties, format, formatConfig, bufferSize, startupMode, mockData);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.shaded.curator5.com.google.common.base.Preconditions;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
Expand Down Expand Up @@ -92,6 +94,7 @@ public class AuronKafkaSourceFunction extends RichParallelSourceFunction<RowData
private final Map<String, String> formatConfig;
private final int bufferSize;
private final String startupMode;
private String mockData;
private transient PhysicalPlanNode physicalPlanNode;

// Flink Checkpoint-related, compatible with Flink Kafka Legacy source
Expand Down Expand Up @@ -174,53 +177,63 @@ public void open(Configuration config) throws Exception {
this.auronOperatorId + "-" + getRuntimeContext().getIndexOfThisSubtask();
scanExecNode.setAuronOperatorId(auronOperatorIdWithSubtaskIndex);
scanExecNode.setStartupMode(KafkaStartupMode.valueOf(startupMode));
sourcePlan.setKafkaScan(scanExecNode.build());
this.physicalPlanNode = sourcePlan.build();

// 1. Initialize Kafka Consumer for partition metadata discovery only (not for data consumption)
Properties kafkaProps = new Properties();
kafkaProps.putAll(kafkaProperties);
// Override to ensure this consumer does not interfere with actual data consumption
kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, "flink-auron-fetch-meta");
kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
kafkaProps.put(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
kafkaProps.put(
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
this.kafkaConsumer = new KafkaConsumer<>(kafkaProps);

StreamingRuntimeContext runtimeContext = (StreamingRuntimeContext) getRuntimeContext();
// 2. Discover and assign partitions for this subtask
List<PartitionInfo> partitionInfos = kafkaConsumer.partitionsFor(topic);
int subtaskIndex = runtimeContext.getIndexOfThisSubtask();
int numSubtasks = runtimeContext.getNumberOfParallelSubtasks();

this.assignedPartitions = new ArrayList<>();
for (PartitionInfo partitionInfo : partitionInfos) {
int partitionId = partitionInfo.partition();
if (KafkaTopicPartitionAssigner.assign(topic, partitionId, numSubtasks) == subtaskIndex) {
assignedPartitions.add(partitionId);
}
}
boolean enableCheckpoint = runtimeContext.isCheckpointingEnabled();
Map<String, Object> auronRuntimeInfo = new HashMap<>();
auronRuntimeInfo.put("subtask_index", subtaskIndex);
auronRuntimeInfo.put("num_readers", numSubtasks);
auronRuntimeInfo.put("enable_checkpoint", enableCheckpoint);
auronRuntimeInfo.put("restored_offsets", restoredOffsets);
auronRuntimeInfo.put("assigned_partitions", assignedPartitions);
JniBridge.putResource(auronOperatorIdWithSubtaskIndex, mapper.writeValueAsString(auronRuntimeInfo));
currentOffsets = new HashMap<>();
pendingOffsetsToCommit = new LinkedMap();
LOG.info(
"Auron kafka source init successful, Auron operator id: {}, enableCheckpoint is {}, "
+ "subtask {} assigned partitions: {}",
auronOperatorIdWithSubtaskIndex,
enableCheckpoint,
subtaskIndex,
assignedPartitions);
if (mockData != null) {
scanExecNode.setMockDataJsonArray(mockData);
JsonNode mockDataJson = mapper.readTree(mockData);
for (JsonNode data : mockDataJson) {
int partition = data.get("serialized_kafka_records_partition").asInt();
Comment on lines +187 to +188
Copy link

Copilot AI Mar 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In mock-data mode, mapper.readTree(mockData) is assumed to be a JSON array of objects each containing serialized_kafka_records_partition. If mockData is not an array, or if an element lacks that field, data.get(...).asInt() can throw/produce incorrect defaults. Please validate mockDataJson.isArray() and that each element has the expected fields, and fail fast with a clear error message.

Suggested change
for (JsonNode data : mockDataJson) {
int partition = data.get("serialized_kafka_records_partition").asInt();
if (!mockDataJson.isArray()) {
throw new IllegalArgumentException(
"Mock data for auron kafka source must be a JSON array of objects, but was: "
+ mockDataJson.getNodeType());
}
for (int i = 0; i < mockDataJson.size(); i++) {
JsonNode data = mockDataJson.get(i);
if (data == null || !data.isObject()) {
throw new IllegalArgumentException(
"Each element in mock data array must be a JSON object; invalid element at index "
+ i
+ " with type: "
+ (data == null ? "null" : data.getNodeType()));
}
JsonNode partitionNode = data.get("serialized_kafka_records_partition");
if (partitionNode == null || !partitionNode.isInt()) {
throw new IllegalArgumentException(
"Mock data element at index "
+ i
+ " must contain integer field 'serialized_kafka_records_partition'");
}
int partition = partitionNode.intValue();

Copilot uses AI. Check for mistakes.
if (!assignedPartitions.contains(partition)) {
assignedPartitions.add(partition);
}
}
LOG.info("Use mock data for auron kafka source, partition size = {}", assignedPartitions);
Copy link

Copilot AI Mar 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The log message says partition size = {} but logs the full assignedPartitions list. Either log assignedPartitions.size() or adjust the message to reflect that it prints the partition list.

Suggested change
LOG.info("Use mock data for auron kafka source, partition size = {}", assignedPartitions);
LOG.info("Use mock data for auron kafka source, partition size = {}", assignedPartitions.size());

Copilot uses AI. Check for mistakes.
} else {
// 1. Initialize Kafka Consumer for partition metadata discovery only (not for data consumption)
Properties kafkaProps = new Properties();
kafkaProps.putAll(kafkaProperties);
// Override to ensure this consumer does not interfere with actual data consumption
kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, "flink-auron-fetch-meta");
kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
kafkaProps.put(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
kafkaProps.put(
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
this.kafkaConsumer = new KafkaConsumer<>(kafkaProps);

// 2. Discover and assign partitions for this subtask
List<PartitionInfo> partitionInfos = kafkaConsumer.partitionsFor(topic);
int subtaskIndex = runtimeContext.getIndexOfThisSubtask();
int numSubtasks = runtimeContext.getNumberOfParallelSubtasks();
for (PartitionInfo partitionInfo : partitionInfos) {
int partitionId = partitionInfo.partition();
if (KafkaTopicPartitionAssigner.assign(topic, partitionId, numSubtasks) == subtaskIndex) {
assignedPartitions.add(partitionId);
}
}
boolean enableCheckpoint = runtimeContext.isCheckpointingEnabled();
Map<String, Object> auronRuntimeInfo = new HashMap<>();
auronRuntimeInfo.put("subtask_index", subtaskIndex);
auronRuntimeInfo.put("num_readers", numSubtasks);
auronRuntimeInfo.put("enable_checkpoint", enableCheckpoint);
auronRuntimeInfo.put("restored_offsets", restoredOffsets);
auronRuntimeInfo.put("assigned_partitions", assignedPartitions);
JniBridge.putResource(auronOperatorIdWithSubtaskIndex, mapper.writeValueAsString(auronRuntimeInfo));
LOG.info(
"Auron kafka source init successful, Auron operator id: {}, enableCheckpoint is {}, "
+ "subtask {} assigned partitions: {}",
auronOperatorIdWithSubtaskIndex,
enableCheckpoint,
subtaskIndex,
assignedPartitions);
}
sourcePlan.setKafkaScan(scanExecNode.build());
this.physicalPlanNode = sourcePlan.build();

// 3. Initialize table-runtime WatermarkGenerator if watermarkStrategy is set
if (watermarkStrategy != null) {
Expand Down Expand Up @@ -458,4 +471,9 @@ public void initializeState(FunctionInitializationContext context) throws Except
public void setWatermarkStrategy(WatermarkStrategy<RowData> watermarkStrategy) {
this.watermarkStrategy = watermarkStrategy;
}

public void setMockData(String mockData) {
Preconditions.checkArgument(mockData != null, "Auron kafka source mock data must not null");
Copy link

Copilot AI Mar 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

setMockData precondition message is grammatically incorrect: "must not null". Please change it to something like "must not be null" (or include the option key name to make failures easier to diagnose).

Suggested change
Preconditions.checkArgument(mockData != null, "Auron kafka source mock data must not null");
Preconditions.checkArgument(mockData != null, "Auron kafka source mock data must not be null");

Copilot uses AI. Check for mistakes.
this.mockData = mockData;
}
}
1 change: 1 addition & 0 deletions native-engine/auron-planner/proto/auron.proto
Original file line number Diff line number Diff line change
Expand Up @@ -755,6 +755,7 @@ message KafkaScanExecNode {
string auron_operator_id = 6;
KafkaFormat data_format = 7;
string format_config_json = 8;
string mock_data_json_array = 9;
}

enum KafkaFormat {
Expand Down
Loading
Loading