-
Notifications
You must be signed in to change notification settings - Fork 210
[AURON #2093] Introduce kafka mock source #2106
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,62 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package org.apache.auron.flink.table.kafka; | ||
|
|
||
| import static org.assertj.core.api.Assertions.assertThat; | ||
|
|
||
| import java.time.LocalDateTime; | ||
| import java.util.List; | ||
| import org.apache.flink.types.Row; | ||
| import org.apache.flink.util.CollectionUtil; | ||
| import org.junit.jupiter.api.Test; | ||
|
|
||
| /** | ||
| * IT case for Auron Flink Kafka Source. | ||
| */ | ||
| public class AuronKafkaSourceITCase extends AuronKafkaSourceTestBase { | ||
|
|
||
| @Test | ||
| public void testEventTimeTumbleTvfWindow() { | ||
| environment.setParallelism(1); | ||
| List<Row> rows = CollectionUtil.iteratorToList(tableEnvironment | ||
| .executeSql( | ||
| "SELECT `name`, count(1), window_start FROM TABLE(" | ||
| + "TUMBLE(TABLE T2, DESCRIPTOR(`ts`), INTERVAL '1' MINUTE)) GROUP BY `name`, window_start, window_end") | ||
| .collect()); | ||
| assertThat(rows.size()).isEqualTo(3); | ||
| assertRowsContains( | ||
| rows, | ||
| new Object[] {"zm1", 1L, LocalDateTime.parse("2026-03-16T12:03:00")}, | ||
| new Object[] {"zm2", 1L, LocalDateTime.parse("2026-03-16T12:03:00")}, | ||
| new Object[] {"zm1", 1L, LocalDateTime.parse("2026-03-16T12:05:00")}); | ||
| } | ||
|
|
||
| @Test | ||
| public void testEventTimeTumbleGroupWindow() { | ||
| environment.setParallelism(1); | ||
| List<Row> rows = CollectionUtil.iteratorToList(tableEnvironment | ||
| .executeSql("SELECT `name`, count(1), TUMBLE_START(`ts`, INTERVAL '1' MINUTE) " | ||
| + "FROM T2 group by TUMBLE(`ts`, INTERVAL '1' MINUTE), `name`") | ||
| .collect()); | ||
| assertThat(rows.size()).isEqualTo(3); | ||
| assertRowsContains( | ||
| rows, | ||
| new Object[] {"zm1", 1L, LocalDateTime.parse("2026-03-16T12:03:00")}, | ||
| new Object[] {"zm2", 1L, LocalDateTime.parse("2026-03-16T12:03:00")}, | ||
| new Object[] {"zm1", 1L, LocalDateTime.parse("2026-03-16T12:05:00")}); | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,95 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package org.apache.auron.flink.table.kafka; | ||
|
|
||
| import static org.assertj.core.api.Assertions.assertThat; | ||
|
|
||
| import java.util.Arrays; | ||
| import java.util.List; | ||
| import org.apache.flink.api.common.RuntimeExecutionMode; | ||
| import org.apache.flink.api.common.restartstrategy.RestartStrategies; | ||
| import org.apache.flink.configuration.Configuration; | ||
| import org.apache.flink.configuration.ExecutionOptions; | ||
| import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; | ||
| import org.apache.flink.table.api.EnvironmentSettings; | ||
| import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; | ||
| import org.apache.flink.types.Row; | ||
| import org.junit.jupiter.api.BeforeAll; | ||
| import org.junit.jupiter.api.TestInstance; | ||
|
|
||
| /** | ||
| * Base class for Auron Flink Kafka Table Tests. | ||
| */ | ||
| @TestInstance(TestInstance.Lifecycle.PER_CLASS) | ||
| public class AuronKafkaSourceTestBase { | ||
| protected StreamExecutionEnvironment environment; | ||
| protected StreamTableEnvironment tableEnvironment; | ||
|
|
||
| @BeforeAll | ||
| public void before() { | ||
| Configuration configuration = new Configuration(); | ||
| // TODO Resolving the issue where the Flink classloader is closed and CompileUtils.doCompile fails | ||
| configuration.setString("classloader.check-leaked-classloader", "false"); | ||
| // set time zone to UTC | ||
| configuration.setString("table.local-time-zone", "UTC"); | ||
| configuration.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.STREAMING); | ||
| environment = StreamExecutionEnvironment.getExecutionEnvironment(configuration); | ||
| environment.setRestartStrategy(RestartStrategies.noRestart()); | ||
| environment.getConfig().setAutoWatermarkInterval(1); | ||
| tableEnvironment = | ||
| StreamTableEnvironment.create(environment, EnvironmentSettings.fromConfiguration(configuration)); | ||
| String jsonArray = "[" | ||
| + "{\"serialized_kafka_records_partition\": 1, \"serialized_kafka_records_offset\": 100000, " | ||
| + "\"serialized_kafka_records_timestamp\": 1773662603760, \"event_time\": 1773662603760, \"age\": 20, \"name\":\"zm1\"}," | ||
| + "{\"serialized_kafka_records_partition\": 1, \"serialized_kafka_records_offset\": 100001, " | ||
| + "\"serialized_kafka_records_timestamp\": 1773662603761, \"event_time\": 1773662633760, \"age\": 21, \"name\":\"zm2\"}," | ||
| + "{\"serialized_kafka_records_partition\": 1, \"serialized_kafka_records_offset\": 100002, " | ||
| + "\"serialized_kafka_records_timestamp\": 1773662603762, \"event_time\": 1773662703761, \"age\": 22, \"name\":\"zm1\"}" | ||
| + "]"; | ||
| tableEnvironment.executeSql(" CREATE TABLE T2 ( " | ||
| + "\n `event_time` BIGINT, " | ||
| + "\n `age` INT, " | ||
| + "\n `name` STRING," | ||
| + "\n `ts` AS TO_TIMESTAMP(FROM_UNIXTIME(event_time / 1000))," | ||
| + "\n WATERMARK FOR `ts` AS `ts` " | ||
| + "\n ) WITH ( " | ||
| + "\n 'connector' = 'auron-kafka'," | ||
| + "\n 'kafka.mock.data' = '" + jsonArray + "'," | ||
| + "\n 'topic' = 'mock_topic'," | ||
| + "\n 'properties.bootstrap.servers' = '127.0.0.1:9092'," | ||
| + "\n 'properties.group.id' = 'flink-test-mock'," | ||
| + "\n 'format' = 'JSON' " | ||
| + "\n )"); | ||
| } | ||
|
|
||
| protected void assertRowsContains(List<Row> actualRows, Object[]... expectedRows) { | ||
| for (Object[] expected : expectedRows) { | ||
| boolean found = actualRows.stream().anyMatch(row -> { | ||
| for (int i = 0; i < expected.length; i++) { | ||
| Object actual = row.getField(i); | ||
| if (!java.util.Objects.equals(expected[i], actual)) { | ||
| return false; | ||
| } | ||
| } | ||
| return true; | ||
| }); | ||
| assertThat(found) | ||
| .as("Expected row %s not found in actual rows: %s", Arrays.toString(expected), actualRows) | ||
| .isTrue(); | ||
| } | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -50,6 +50,8 @@ | |||||||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.flink.metrics.MetricGroup; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.flink.runtime.state.FunctionInitializationContext; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.flink.runtime.state.FunctionSnapshotContext; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.flink.shaded.curator5.com.google.common.base.Preconditions; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -92,6 +94,7 @@ public class AuronKafkaSourceFunction extends RichParallelSourceFunction<RowData | |||||||||||||||||||||||||||||||||||||||||||||||||
| private final Map<String, String> formatConfig; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| private final int bufferSize; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| private final String startupMode; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| private String mockData; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| private transient PhysicalPlanNode physicalPlanNode; | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||
| // Flink Checkpoint-related, compatible with Flink Kafka Legacy source | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -174,53 +177,63 @@ public void open(Configuration config) throws Exception { | |||||||||||||||||||||||||||||||||||||||||||||||||
| this.auronOperatorId + "-" + getRuntimeContext().getIndexOfThisSubtask(); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| scanExecNode.setAuronOperatorId(auronOperatorIdWithSubtaskIndex); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| scanExecNode.setStartupMode(KafkaStartupMode.valueOf(startupMode)); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| sourcePlan.setKafkaScan(scanExecNode.build()); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| this.physicalPlanNode = sourcePlan.build(); | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||
| // 1. Initialize Kafka Consumer for partition metadata discovery only (not for data consumption) | ||||||||||||||||||||||||||||||||||||||||||||||||||
| Properties kafkaProps = new Properties(); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| kafkaProps.putAll(kafkaProperties); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| // Override to ensure this consumer does not interfere with actual data consumption | ||||||||||||||||||||||||||||||||||||||||||||||||||
| kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, "flink-auron-fetch-meta"); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| kafkaProps.put( | ||||||||||||||||||||||||||||||||||||||||||||||||||
| ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, | ||||||||||||||||||||||||||||||||||||||||||||||||||
| "org.apache.kafka.common.serialization.ByteArrayDeserializer"); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| kafkaProps.put( | ||||||||||||||||||||||||||||||||||||||||||||||||||
| ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, | ||||||||||||||||||||||||||||||||||||||||||||||||||
| "org.apache.kafka.common.serialization.ByteArrayDeserializer"); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| this.kafkaConsumer = new KafkaConsumer<>(kafkaProps); | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||
| StreamingRuntimeContext runtimeContext = (StreamingRuntimeContext) getRuntimeContext(); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| // 2. Discover and assign partitions for this subtask | ||||||||||||||||||||||||||||||||||||||||||||||||||
| List<PartitionInfo> partitionInfos = kafkaConsumer.partitionsFor(topic); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| int subtaskIndex = runtimeContext.getIndexOfThisSubtask(); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| int numSubtasks = runtimeContext.getNumberOfParallelSubtasks(); | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||
| this.assignedPartitions = new ArrayList<>(); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| for (PartitionInfo partitionInfo : partitionInfos) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| int partitionId = partitionInfo.partition(); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| if (KafkaTopicPartitionAssigner.assign(topic, partitionId, numSubtasks) == subtaskIndex) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| assignedPartitions.add(partitionId); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||
| boolean enableCheckpoint = runtimeContext.isCheckpointingEnabled(); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| Map<String, Object> auronRuntimeInfo = new HashMap<>(); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| auronRuntimeInfo.put("subtask_index", subtaskIndex); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| auronRuntimeInfo.put("num_readers", numSubtasks); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| auronRuntimeInfo.put("enable_checkpoint", enableCheckpoint); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| auronRuntimeInfo.put("restored_offsets", restoredOffsets); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| auronRuntimeInfo.put("assigned_partitions", assignedPartitions); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| JniBridge.putResource(auronOperatorIdWithSubtaskIndex, mapper.writeValueAsString(auronRuntimeInfo)); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| currentOffsets = new HashMap<>(); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| pendingOffsetsToCommit = new LinkedMap(); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| LOG.info( | ||||||||||||||||||||||||||||||||||||||||||||||||||
| "Auron kafka source init successful, Auron operator id: {}, enableCheckpoint is {}, " | ||||||||||||||||||||||||||||||||||||||||||||||||||
| + "subtask {} assigned partitions: {}", | ||||||||||||||||||||||||||||||||||||||||||||||||||
| auronOperatorIdWithSubtaskIndex, | ||||||||||||||||||||||||||||||||||||||||||||||||||
| enableCheckpoint, | ||||||||||||||||||||||||||||||||||||||||||||||||||
| subtaskIndex, | ||||||||||||||||||||||||||||||||||||||||||||||||||
| assignedPartitions); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| if (mockData != null) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| scanExecNode.setMockDataJsonArray(mockData); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| JsonNode mockDataJson = mapper.readTree(mockData); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| for (JsonNode data : mockDataJson) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| int partition = data.get("serialized_kafka_records_partition").asInt(); | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+187
to
+188
|
||||||||||||||||||||||||||||||||||||||||||||||||||
| for (JsonNode data : mockDataJson) { | |
| int partition = data.get("serialized_kafka_records_partition").asInt(); | |
| if (!mockDataJson.isArray()) { | |
| throw new IllegalArgumentException( | |
| "Mock data for auron kafka source must be a JSON array of objects, but was: " | |
| + mockDataJson.getNodeType()); | |
| } | |
| for (int i = 0; i < mockDataJson.size(); i++) { | |
| JsonNode data = mockDataJson.get(i); | |
| if (data == null || !data.isObject()) { | |
| throw new IllegalArgumentException( | |
| "Each element in mock data array must be a JSON object; invalid element at index " | |
| + i | |
| + " with type: " | |
| + (data == null ? "null" : data.getNodeType())); | |
| } | |
| JsonNode partitionNode = data.get("serialized_kafka_records_partition"); | |
| if (partitionNode == null || !partitionNode.isInt()) { | |
| throw new IllegalArgumentException( | |
| "Mock data element at index " | |
| + i | |
| + " must contain integer field 'serialized_kafka_records_partition'"); | |
| } | |
| int partition = partitionNode.intValue(); |
Copilot
AI
Mar 21, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The log message says partition size = {} but logs the full assignedPartitions list. Either log assignedPartitions.size() or adjust the message to reflect that it prints the partition list.
| LOG.info("Use mock data for auron kafka source, partition size = {}", assignedPartitions); | |
| LOG.info("Use mock data for auron kafka source, partition size = {}", assignedPartitions.size()); |
Copilot
AI
Mar 21, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
setMockData precondition message is grammatically incorrect: "must not null". Please change it to something like "must not be null" (or include the option key name to make failures easier to diagnose).
| Preconditions.checkArgument(mockData != null, "Auron kafka source mock data must not null"); | |
| Preconditions.checkArgument(mockData != null, "Auron kafka source mock data must not be null"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
KAFKA_MOCK_DATAis read viatableOptions.get(KAFKA_MOCK_DATA). Because the option hasnoDefaultValue(), this makes the new option effectively required and will break existing tables that don't set it. Please usegetOptional(KAFKA_MOCK_DATA).orElse(null)(or similar) and keep the downstream code path null/empty-safe.