Skip to content

Commit 1f58deb

Browse files
committed
[Fix](routine load) Fix routine load partial update (apache#59209)
1. fix`"partial_colunms"="true"` doesn't take effect in routine load 2. add `"partial_update_new_key_behavior"="APPEND"/"ERROR"` property in routine load doc: apache/doris-website#3211
1 parent aff7b4d commit 1f58deb

File tree

7 files changed

+487
-5
lines changed

7 files changed

+487
-5
lines changed

fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.apache.doris.load.routineload.RoutineLoadDataSourcePropertyFactory;
3636
import org.apache.doris.load.routineload.RoutineLoadJob;
3737
import org.apache.doris.qe.ConnectContext;
38+
import org.apache.doris.thrift.TPartialUpdateNewRowPolicy;
3839

3940
import com.google.common.base.Strings;
4041
import com.google.common.collect.ImmutableSet;
@@ -111,6 +112,7 @@ public class CreateRoutineLoadStmt extends DdlStmt implements NotFallbackInParse
111112
public static final String FUZZY_PARSE = "fuzzy_parse";
112113

113114
public static final String PARTIAL_COLUMNS = "partial_columns";
115+
public static final String PARTIAL_UPDATE_NEW_KEY_POLICY = "partial_update_new_key_behavior";
114116

115117
public static final String WORKLOAD_GROUP = "workload_group";
116118

@@ -141,6 +143,7 @@ public class CreateRoutineLoadStmt extends DdlStmt implements NotFallbackInParse
141143
.add(SEND_BATCH_PARALLELISM)
142144
.add(LOAD_TO_SINGLE_TABLET)
143145
.add(PARTIAL_COLUMNS)
146+
.add(PARTIAL_UPDATE_NEW_KEY_POLICY)
144147
.add(WORKLOAD_GROUP)
145148
.add(LoadStmt.KEY_ENCLOSE)
146149
.add(LoadStmt.KEY_ESCAPE)
@@ -178,6 +181,7 @@ public class CreateRoutineLoadStmt extends DdlStmt implements NotFallbackInParse
178181
*/
179182
@Getter
180183
private boolean isPartialUpdate = false;
184+
private TPartialUpdateNewRowPolicy partialUpdateNewKeyPolicy = TPartialUpdateNewRowPolicy.APPEND;
181185

182186
private String comment = "";
183187

@@ -211,6 +215,15 @@ public CreateRoutineLoadStmt(LabelName labelName, String tableName, List<ParseNo
211215
.createDataSource(typeName, dataSourceProperties, this.isMultiTable);
212216
this.mergeType = mergeType;
213217
this.isPartialUpdate = this.jobProperties.getOrDefault(PARTIAL_COLUMNS, "false").equalsIgnoreCase("true");
218+
if (this.isPartialUpdate && this.jobProperties.containsKey(PARTIAL_UPDATE_NEW_KEY_POLICY)) {
219+
String policyStr = this.jobProperties.get(PARTIAL_UPDATE_NEW_KEY_POLICY).toUpperCase();
220+
if ("APPEND".equals(policyStr)) {
221+
this.partialUpdateNewKeyPolicy = TPartialUpdateNewRowPolicy.APPEND;
222+
} else if ("ERROR".equals(policyStr)) {
223+
this.partialUpdateNewKeyPolicy = TPartialUpdateNewRowPolicy.ERROR;
224+
}
225+
// validation will be done in checkJobProperties()
226+
}
214227
if (comment != null) {
215228
this.comment = comment;
216229
}
@@ -282,6 +295,10 @@ public String getTimezone() {
282295
return timezone;
283296
}
284297

298+
public TPartialUpdateNewRowPolicy getPartialUpdateNewKeyPolicy() {
299+
return partialUpdateNewKeyPolicy;
300+
}
301+
285302
public LoadTask.MergeType getMergeType() {
286303
return mergeType;
287304
}
@@ -489,6 +506,18 @@ private void checkJobProperties() throws UserException {
489506
}
490507
timezone = TimeUtils.checkTimeZoneValidAndStandardize(jobProperties.getOrDefault(LoadStmt.TIMEZONE, timezone));
491508

509+
// check partial_update_new_key_behavior
510+
if (jobProperties.containsKey(PARTIAL_UPDATE_NEW_KEY_POLICY)) {
511+
if (!isPartialUpdate) {
512+
throw new AnalysisException(
513+
PARTIAL_UPDATE_NEW_KEY_POLICY + " can only be set when partial_columns is true");
514+
}
515+
String policy = jobProperties.get(PARTIAL_UPDATE_NEW_KEY_POLICY).toUpperCase();
516+
if (!"APPEND".equals(policy) && !"ERROR".equals(policy)) {
517+
throw new AnalysisException(
518+
PARTIAL_UPDATE_NEW_KEY_POLICY + " should be one of {'APPEND', 'ERROR'}, but found " + policy);
519+
}
520+
}
492521
fileFormatProperties.analyzeFileFormatProperties(jobProperties, false);
493522
}
494523

fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import org.apache.doris.persist.AlterRoutineLoadJobOperationLog;
4545
import org.apache.doris.rpc.RpcException;
4646
import org.apache.doris.thrift.TFileCompressType;
47+
import org.apache.doris.thrift.TPartialUpdateNewRowPolicy;
4748
import org.apache.doris.transaction.TransactionState;
4849
import org.apache.doris.transaction.TransactionStatus;
4950

@@ -799,6 +800,14 @@ private void modifyPropertiesInternal(Map<String, String> jobProperties,
799800
if (jobProperties.containsKey(CreateRoutineLoadStmt.PARTIAL_COLUMNS)) {
800801
this.isPartialUpdate = BooleanUtils.toBoolean(jobProperties.get(CreateRoutineLoadStmt.PARTIAL_COLUMNS));
801802
}
803+
if (jobProperties.containsKey(CreateRoutineLoadInfo.PARTIAL_UPDATE_NEW_KEY_POLICY)) {
804+
String policy = jobProperties.get(CreateRoutineLoadInfo.PARTIAL_UPDATE_NEW_KEY_POLICY);
805+
if ("ERROR".equalsIgnoreCase(policy)) {
806+
this.partialUpdateNewKeyPolicy = TPartialUpdateNewRowPolicy.ERROR;
807+
} else {
808+
this.partialUpdateNewKeyPolicy = TPartialUpdateNewRowPolicy.APPEND;
809+
}
810+
}
802811
}
803812
LOG.info("modify the properties of kafka routine load job: {}, jobProperties: {}, datasource properties: {}",
804813
this.id, jobProperties, dataSourceProperties);

fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@
6767
import org.apache.doris.task.LoadTaskInfo;
6868
import org.apache.doris.thrift.TFileFormatType;
6969
import org.apache.doris.thrift.TFileType;
70+
import org.apache.doris.thrift.TPartialUpdateNewRowPolicy;
7071
import org.apache.doris.thrift.TPipelineFragmentParams;
7172
import org.apache.doris.thrift.TUniqueId;
7273
import org.apache.doris.transaction.AbstractTxnStateChangeCallback;
@@ -223,6 +224,7 @@ public boolean isFinalState() {
223224
protected long maxBatchSizeBytes = DEFAULT_MAX_BATCH_SIZE;
224225

225226
protected boolean isPartialUpdate = false;
227+
protected TPartialUpdateNewRowPolicy partialUpdateNewKeyPolicy = TPartialUpdateNewRowPolicy.APPEND;
226228

227229
protected String sequenceCol;
228230

@@ -394,6 +396,9 @@ protected void setOptional(CreateRoutineLoadStmt stmt) throws UserException {
394396
jobProperties.put(CreateRoutineLoadStmt.PARTIAL_COLUMNS, stmt.isPartialUpdate() ? "true" : "false");
395397
if (stmt.isPartialUpdate()) {
396398
this.isPartialUpdate = true;
399+
this.partialUpdateNewKeyPolicy = info.getPartialUpdateNewKeyPolicy();
400+
jobProperties.put(info.PARTIAL_UPDATE_NEW_KEY_POLICY,
401+
this.partialUpdateNewKeyPolicy == TPartialUpdateNewRowPolicy.ERROR ? "ERROR" : "APPEND");
397402
}
398403
jobProperties.put(CreateRoutineLoadStmt.MAX_FILTER_RATIO_PROPERTY, String.valueOf(maxFilterRatio));
399404

@@ -1830,11 +1835,15 @@ public String jobPropertiesToJsonString() {
18301835

18311836
// job properties defined in CreateRoutineLoadStmt
18321837
jobProperties.put(CreateRoutineLoadStmt.PARTIAL_COLUMNS, String.valueOf(isPartialUpdate));
1833-
jobProperties.put(CreateRoutineLoadStmt.MAX_ERROR_NUMBER_PROPERTY, String.valueOf(maxErrorNum));
1834-
jobProperties.put(CreateRoutineLoadStmt.MAX_BATCH_INTERVAL_SEC_PROPERTY, String.valueOf(maxBatchIntervalS));
1835-
jobProperties.put(CreateRoutineLoadStmt.MAX_BATCH_ROWS_PROPERTY, String.valueOf(maxBatchRows));
1836-
jobProperties.put(CreateRoutineLoadStmt.MAX_BATCH_SIZE_PROPERTY, String.valueOf(maxBatchSizeBytes));
1837-
jobProperties.put(CreateRoutineLoadStmt.CURRENT_CONCURRENT_NUMBER_PROPERTY,
1838+
if (isPartialUpdate) {
1839+
jobProperties.put(CreateRoutineLoadInfo.PARTIAL_UPDATE_NEW_KEY_POLICY,
1840+
partialUpdateNewKeyPolicy == TPartialUpdateNewRowPolicy.ERROR ? "ERROR" : "APPEND");
1841+
}
1842+
jobProperties.put(CreateRoutineLoadInfo.MAX_ERROR_NUMBER_PROPERTY, String.valueOf(maxErrorNum));
1843+
jobProperties.put(CreateRoutineLoadInfo.MAX_BATCH_INTERVAL_SEC_PROPERTY, String.valueOf(maxBatchIntervalS));
1844+
jobProperties.put(CreateRoutineLoadInfo.MAX_BATCH_ROWS_PROPERTY, String.valueOf(maxBatchRows));
1845+
jobProperties.put(CreateRoutineLoadInfo.MAX_BATCH_SIZE_PROPERTY, String.valueOf(maxBatchSizeBytes));
1846+
jobProperties.put(CreateRoutineLoadInfo.CURRENT_CONCURRENT_NUMBER_PROPERTY,
18381847
String.valueOf(currentTaskConcurrentNum));
18391848
jobProperties.put(CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PROPERTY,
18401849
String.valueOf(desireTaskConcurrentNum));
@@ -1896,6 +1905,12 @@ public void gsonPostProcess() throws IOException {
18961905
jobProperties.forEach((k, v) -> {
18971906
if (k.equals(CreateRoutineLoadStmt.PARTIAL_COLUMNS)) {
18981907
isPartialUpdate = Boolean.parseBoolean(v);
1908+
} else if (k.equals(CreateRoutineLoadInfo.PARTIAL_UPDATE_NEW_KEY_POLICY)) {
1909+
if ("ERROR".equalsIgnoreCase(v)) {
1910+
partialUpdateNewKeyPolicy = TPartialUpdateNewRowPolicy.ERROR;
1911+
} else {
1912+
partialUpdateNewKeyPolicy = TPartialUpdateNewRowPolicy.APPEND;
1913+
}
18991914
}
19001915
});
19011916
SqlParser parser = new SqlParser(new SqlScanner(new StringReader(origStmt.originStmt),
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
-- This file is automatically generated. You should know what you did if you want to edit this
2+
-- !select_initial --
3+
1 alice 100 20
4+
2 bob 90 21
5+
3 charlie 80 22
6+
7+
-- !select_after_partial_update --
8+
1 alice 150 20
9+
2 bob 95 21
10+
3 charlie 80 22
11+
100 \N 100 \N
12+
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
-- This file is automatically generated. You should know what you did if you want to edit this
2+
-- !select_initial --
3+
1 1 1 1
4+
2 2 2 2
5+
3 3 3 3
6+
7+
-- !select_after_append --
8+
1 10 1 1
9+
2 20 2 2
10+
3 3 3 3
11+
4 40 \N \N
12+
5 50 \N \N
13+
14+
-- !select_after_error_mode --
15+
1 1 100 1
16+
2 2 200 2
17+
3 3 3 3
18+
4 4 40 4
19+
5 5 50 5
20+
21+
-- !select_after_error_rejected --
22+
1 1 100 1
23+
2 2 200 2
24+
3 3 3 3
25+
4 4 40 4
26+
5 5 50 5
27+
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
import org.apache.doris.regression.util.RoutineLoadTestUtils
19+
import org.apache.kafka.clients.producer.KafkaProducer
20+
import org.apache.kafka.clients.producer.ProducerRecord
21+
22+
suite("test_routine_load_partial_update", "nonConcurrent") {
23+
def kafkaCsvTopic = "test_routine_load_partial_update"
24+
25+
if (RoutineLoadTestUtils.isKafkaTestEnabled(context)) {
26+
def runSql = { String q -> sql q }
27+
def kafka_broker = RoutineLoadTestUtils.getKafkaBroker(context)
28+
def producer = RoutineLoadTestUtils.createKafkaProducer(kafka_broker)
29+
30+
def tableName = "test_routine_load_partial_update"
31+
def job = "test_partial_update_job"
32+
33+
sql """ DROP TABLE IF EXISTS ${tableName} force;"""
34+
sql """
35+
CREATE TABLE IF NOT EXISTS ${tableName} (
36+
`id` int NULL,
37+
`name` varchar(65533) NULL,
38+
`score` int NULL,
39+
`age` int NULL
40+
) ENGINE=OLAP
41+
UNIQUE KEY(`id`)
42+
COMMENT 'test partial update'
43+
DISTRIBUTED BY HASH(`id`) BUCKETS 3
44+
PROPERTIES (
45+
"replication_allocation" = "tag.location.default: 1",
46+
"enable_unique_key_merge_on_write" = "true"
47+
);
48+
"""
49+
50+
// insert initial data
51+
sql """
52+
INSERT INTO ${tableName} VALUES
53+
(1, 'alice', 100, 20),
54+
(2, 'bob', 90, 21),
55+
(3, 'charlie', 80, 22)
56+
"""
57+
58+
qt_select_initial "SELECT * FROM ${tableName} ORDER BY id"
59+
60+
try {
61+
// create routine load with partial_columns=true
62+
// only update id and score columns
63+
sql """
64+
CREATE ROUTINE LOAD ${job} ON ${tableName}
65+
COLUMNS TERMINATED BY ",",
66+
COLUMNS (id, score)
67+
PROPERTIES
68+
(
69+
"max_batch_interval" = "10",
70+
"partial_columns" = "true"
71+
)
72+
FROM KAFKA
73+
(
74+
"kafka_broker_list" = "${kafka_broker}",
75+
"kafka_topic" = "${kafkaCsvTopic}",
76+
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
77+
);
78+
"""
79+
80+
// send partial update data to kafka
81+
// update score for id=1 from 100 to 150
82+
// update score for id=2 from 90 to 95
83+
def data = [
84+
"1,150",
85+
"2,95",
86+
"100,100"
87+
]
88+
89+
data.each { line ->
90+
logger.info("Sending to Kafka: ${line}")
91+
def record = new ProducerRecord<>(kafkaCsvTopic, null, line)
92+
producer.send(record).get()
93+
}
94+
producer.flush()
95+
96+
// wait for routine load task to finish
97+
RoutineLoadTestUtils.waitForTaskFinish(runSql, job, tableName, 3)
98+
99+
// verify partial update: score should be updated, name and age should remain unchanged
100+
qt_select_after_partial_update "SELECT * FROM ${tableName} ORDER BY id"
101+
} catch (Exception e) {
102+
logger.error("Error during test: " + e.getMessage())
103+
throw e
104+
} finally {
105+
sql "STOP ROUTINE LOAD FOR ${job}"
106+
}
107+
}
108+
}

0 commit comments

Comments
 (0)