diff --git a/cloud/src/meta-service/meta_service_txn.cpp b/cloud/src/meta-service/meta_service_txn.cpp index 6be303a4c00317..2f850a4cd13aa8 100644 --- a/cloud/src/meta-service/meta_service_txn.cpp +++ b/cloud/src/meta-service/meta_service_txn.cpp @@ -655,12 +655,15 @@ void update_streaming_job_meta(MetaServiceCode& code, std::string& msg, } new_job_info.set_scanned_rows(new_job_info.scanned_rows() + commit_attachment.scanned_rows()); + new_job_info.set_filtered_rows(new_job_info.filtered_rows() + + commit_attachment.filtered_rows()); new_job_info.set_load_bytes(new_job_info.load_bytes() + commit_attachment.load_bytes()); new_job_info.set_num_files(new_job_info.num_files() + commit_attachment.num_files()); new_job_info.set_file_bytes(new_job_info.file_bytes() + commit_attachment.file_bytes()); } else { new_job_info.set_job_id(commit_attachment.job_id()); new_job_info.set_scanned_rows(commit_attachment.scanned_rows()); + new_job_info.set_filtered_rows(commit_attachment.filtered_rows()); new_job_info.set_load_bytes(commit_attachment.load_bytes()); new_job_info.set_num_files(commit_attachment.num_files()); new_job_info.set_file_bytes(commit_attachment.file_bytes()); @@ -981,6 +984,7 @@ void MetaServiceImpl::reset_streaming_job_offset(::google::protobuf::RpcControll // Preserve existing statistics if they exist if (prev_existed) { new_job_info.set_scanned_rows(prev_job_info.scanned_rows()); + new_job_info.set_filtered_rows(prev_job_info.filtered_rows()); new_job_info.set_load_bytes(prev_job_info.load_bytes()); new_job_info.set_num_files(prev_job_info.num_files()); new_job_info.set_file_bytes(prev_job_info.file_bytes()); diff --git a/cloud/test/meta_service_job_test.cpp b/cloud/test/meta_service_job_test.cpp index ddba9b4c750e69..d5c837e8711d4e 100644 --- a/cloud/test/meta_service_job_test.cpp +++ b/cloud/test/meta_service_job_test.cpp @@ -5213,6 +5213,7 @@ TEST(MetaServiceJobTest, GetStreamingTaskCommitAttachTest) { streaming_attach->set_job_id(1002); streaming_attach->set_offset("test_offset_3"); streaming_attach->set_scanned_rows(2000); + streaming_attach->set_filtered_rows(150); streaming_attach->set_load_bytes(10000); streaming_attach->set_num_files(20); streaming_attach->set_file_bytes(15000); @@ -5241,6 +5242,7 @@ TEST(MetaServiceJobTest, GetStreamingTaskCommitAttachTest) { EXPECT_TRUE(response.has_commit_attach()); EXPECT_EQ(response.commit_attach().job_id(), 1002); EXPECT_EQ(response.commit_attach().scanned_rows(), 2000); + EXPECT_EQ(response.commit_attach().filtered_rows(), 150); EXPECT_EQ(response.commit_attach().load_bytes(), 10000); EXPECT_EQ(response.commit_attach().num_files(), 20); EXPECT_EQ(response.commit_attach().file_bytes(), 15000); @@ -5363,6 +5365,7 @@ TEST(MetaServiceJobTest, ResetStreamingJobOffsetTest) { streaming_attach->set_job_id(job_id); streaming_attach->set_offset("original_offset"); streaming_attach->set_scanned_rows(1000); + streaming_attach->set_filtered_rows(50); streaming_attach->set_load_bytes(5000); streaming_attach->set_num_files(10); streaming_attach->set_file_bytes(8000); @@ -5391,6 +5394,7 @@ TEST(MetaServiceJobTest, ResetStreamingJobOffsetTest) { EXPECT_TRUE(response.has_commit_attach()); EXPECT_EQ(response.commit_attach().offset(), "original_offset"); EXPECT_EQ(response.commit_attach().scanned_rows(), 1000); + EXPECT_EQ(response.commit_attach().filtered_rows(), 50); EXPECT_EQ(response.commit_attach().load_bytes(), 5000); } @@ -5427,6 +5431,7 @@ TEST(MetaServiceJobTest, ResetStreamingJobOffsetTest) { EXPECT_EQ(response.commit_attach().offset(), "reset_offset"); // Other fields should remain unchanged EXPECT_EQ(response.commit_attach().scanned_rows(), 1000); + EXPECT_EQ(response.commit_attach().filtered_rows(), 50); EXPECT_EQ(response.commit_attach().load_bytes(), 5000); EXPECT_EQ(response.commit_attach().num_files(), 10); EXPECT_EQ(response.commit_attach().file_bytes(), 8000); diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/TxnUtil.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/TxnUtil.java index b947ef0649e501..187d551dbd7f41 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/TxnUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/TxnUtil.java @@ -285,7 +285,8 @@ public static TxnCommitAttachmentPB streamingTaskTxnCommitAttachmentToPb(Streami .setScannedRows(streamingTaskTxnCommitAttachment.getScannedRows()) .setLoadBytes(streamingTaskTxnCommitAttachment.getLoadBytes()) .setNumFiles(streamingTaskTxnCommitAttachment.getNumFiles()) - .setFileBytes(streamingTaskTxnCommitAttachment.getFileBytes()); + .setFileBytes(streamingTaskTxnCommitAttachment.getFileBytes()) + .setFilteredRows(streamingTaskTxnCommitAttachment.getFilteredRows()); if (streamingTaskTxnCommitAttachment.getOffset() != null) { builder.setOffset(streamingTaskTxnCommitAttachment.getOffset()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java index 168bb1372598eb..691d6f6615085e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java @@ -720,6 +720,7 @@ private void updateJobStatisticAndOffset(StreamingTaskTxnCommitAttachment attach this.jobStatistic.setLoadBytes(this.jobStatistic.getLoadBytes() + attachment.getLoadBytes()); this.jobStatistic.setFileNumber(this.jobStatistic.getFileNumber() + attachment.getNumFiles()); this.jobStatistic.setFileSize(this.jobStatistic.getFileSize() + attachment.getFileBytes()); + this.jobStatistic.setFilteredRows(this.jobStatistic.getFilteredRows() + attachment.getFilteredRows()); Offset endOffset = offsetProvider.deserializeOffset(attachment.getOffset()); offsetProvider.updateOffset(endOffset); // Sync offsetProviderPersist after each offset update so the checkpoint thread @@ -735,6 +736,7 @@ private void updateJobStatisticAndOffset(StreamingTaskTxnCommitAttachment attach //update metric if (MetricRepo.isInit && !isReplay) { MetricRepo.COUNTER_STREAMING_JOB_TOTAL_ROWS.increase(attachment.getScannedRows()); + MetricRepo.COUNTER_STREAMING_JOB_FILTER_ROWS.increase(attachment.getFilteredRows()); MetricRepo.COUNTER_STREAMING_JOB_LOAD_BYTES.increase(attachment.getLoadBytes()); } } @@ -747,11 +749,13 @@ private void updateCloudJobStatisticAndOffset(StreamingTaskTxnCommitAttachment a this.jobStatistic.setLoadBytes(attachment.getLoadBytes()); this.jobStatistic.setFileNumber(attachment.getNumFiles()); this.jobStatistic.setFileSize(attachment.getFileBytes()); + this.jobStatistic.setFilteredRows(attachment.getFilteredRows()); offsetProvider.updateOffset(offsetProvider.deserializeOffset(attachment.getOffset())); //update metric if (MetricRepo.isInit && !isReplay) { MetricRepo.COUNTER_STREAMING_JOB_TOTAL_ROWS.update(attachment.getScannedRows()); + MetricRepo.COUNTER_STREAMING_JOB_FILTER_ROWS.update(attachment.getFilteredRows()); MetricRepo.COUNTER_STREAMING_JOB_LOAD_BYTES.update(attachment.getLoadBytes()); } } @@ -1134,6 +1138,7 @@ public void beforeCommitted(TransactionState txnState) throws TransactionExcepti loadStatistic.getLoadBytes(), loadStatistic.getFileNumber(), loadStatistic.getTotalFileSizeB(), + loadStatistic.getFilteredRows(), offsetJson)); passCheck = true; } finally { diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingTaskTxnCommitAttachment.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingTaskTxnCommitAttachment.java index 01fdb3e16e454d..c8bf72559e856a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingTaskTxnCommitAttachment.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingTaskTxnCommitAttachment.java @@ -32,7 +32,8 @@ public StreamingTaskTxnCommitAttachment() { } public StreamingTaskTxnCommitAttachment(long jobId, long taskId, - long scannedRows, long loadBytes, long numFiles, long fileBytes, String offset) { + long scannedRows, long loadBytes, long numFiles, long fileBytes, + long filteredRows, String offset) { super(TransactionState.LoadJobSourceType.STREAMING_JOB); this.jobId = jobId; this.taskId = taskId; @@ -40,6 +41,7 @@ public StreamingTaskTxnCommitAttachment(long jobId, long taskId, this.loadBytes = loadBytes; this.numFiles = numFiles; this.fileBytes = fileBytes; + this.filteredRows = filteredRows; this.offset = offset; } @@ -49,6 +51,7 @@ public StreamingTaskTxnCommitAttachment(StreamingTaskCommitAttachmentPB pb) { this.loadBytes = pb.getLoadBytes(); this.numFiles = pb.getNumFiles(); this.fileBytes = pb.getFileBytes(); + this.filteredRows = pb.getFilteredRows(); this.offset = pb.getOffset(); } @@ -69,6 +72,9 @@ public StreamingTaskTxnCommitAttachment(StreamingTaskCommitAttachmentPB pb) { @SerializedName(value = "fs") @Getter private long fileBytes; + @SerializedName(value = "fr") + @Getter + private long filteredRows; @SerializedName(value = "of") @Getter private String offset; @@ -80,6 +86,7 @@ public String toString() { + ", loadBytes=" + loadBytes + ", numFiles=" + numFiles + ", fileBytes=" + fileBytes + + ", filteredRows=" + filteredRows + ", offset=" + offset + "]"; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadStatistic.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadStatistic.java index cf9e51b1f2af85..fc88a0977bb2ff 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadStatistic.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadStatistic.java @@ -58,6 +58,9 @@ public class LoadStatistic { public int fileNum = 0; public long totalFileSizeB = 0; + // number of rows filtered by BE (DPP_ABNORMAL_ALL), set once after coordinator finishes + private long filteredRows = 0; + // init the statistic of specified load task public synchronized void initLoad(TUniqueId loadId, Set fragmentIds, List relatedBackendIds) { counterTbl.rowMap().remove(loadId); @@ -133,6 +136,14 @@ public long getTotalFileSizeB() { return totalFileSizeB; } + public long getFilteredRows() { + return filteredRows; + } + + public void setFilteredRows(long filteredRows) { + this.filteredRows = filteredRows; + } + public synchronized String toJson() { long total = 0; for (long rows : counterTbl.values()) { @@ -156,6 +167,7 @@ public synchronized String toJson() { details.put("LoadBytes", totalBytes); details.put("FileNumber", fileNum); details.put("FileSize", totalFileSizeB); + details.put("FilteredRows", filteredRows); details.put("TaskNumber", counterTbl.rowMap().size()); details.put("Unfinished backends", unfinishedBackendIdsList); details.put("All backends", allBackendIdsList); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java index 9e5e0a02930557..39a7381afcf797 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java @@ -222,6 +222,9 @@ protected final void execImpl(StmtExecutor executor) throws Exception { if (coordinator.getLoadCounters().get(LoadEtlTask.DPP_ABNORMAL_ALL) != null) { filteredRows = Integer.parseInt(coordinator.getLoadCounters().get(LoadEtlTask.DPP_ABNORMAL_ALL)); } + if (insertLoadJob != null) { + insertLoadJob.getLoadStatistic().setFilteredRows(filteredRows); + } } private void checkStrictModeAndFilterRatio() throws Exception { diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto index 569bcf6e023c19..710a0b0dd796e0 100644 --- a/gensrc/proto/cloud.proto +++ b/gensrc/proto/cloud.proto @@ -418,6 +418,7 @@ message StreamingTaskCommitAttachmentPB { optional int64 load_bytes = 4; optional int64 num_files = 5; optional int64 file_bytes = 6; + optional int64 filtered_rows = 7; } message TxnCommitAttachmentPB { diff --git a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_filtered_rows.groovy b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_filtered_rows.groovy new file mode 100644 index 00000000000000..f30756f4ce9b82 --- /dev/null +++ b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_filtered_rows.groovy @@ -0,0 +1,100 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.awaitility.Awaitility + +import static java.util.concurrent.TimeUnit.SECONDS + +suite("test_streaming_insert_job_filtered_rows") { + def tableName = "test_streaming_insert_job_filtered_rows_tbl" + def jobName = "test_streaming_insert_job_filtered_rows_name" + + sql """drop table if exists `${tableName}` force""" + sql """ + DROP JOB IF EXISTS where jobname = '${jobName}' + """ + + // c2 INT NOT NULL forces BE to filter every row when CSV provides + // non-parseable strings in that column. + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + `c1` int NULL, + `c2` int NOT NULL, + `c3` int NULL, + ) ENGINE=OLAP + DUPLICATE KEY(`c1`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`c1`) BUCKETS 3 + PROPERTIES ("replication_allocation" = "tag.location.default: 1"); + """ + + // insert_max_filter_ratio=1 lets the task succeed even if every row is filtered. + sql """ + CREATE JOB ${jobName} + PROPERTIES( + "s3.max_batch_files" = "1", + "session.enable_insert_strict" = "false", + "session.insert_max_filter_ratio" = "1" + ) + ON STREAMING DO INSERT INTO ${tableName} + SELECT * FROM S3 + ( + "uri" = "s3://${s3BucketName}/regression/load/data/example_[0-1].csv", + "format" = "csv", + "provider" = "${getS3Provider()}", + "column_separator" = ",", + "s3.endpoint" = "${getS3Endpoint()}", + "s3.region" = "${getS3Region()}", + "s3.access_key" = "${getS3AK()}", + "s3.secret_key" = "${getS3SK()}" + ); + """ + + try { + Awaitility.await().atMost(300, SECONDS) + .pollInterval(1, SECONDS).until( + { + def jobSucceedCount = sql """ select SucceedTaskCount from jobs("type"="insert") where Name = '${jobName}' and ExecuteType='STREAMING' """ + log.info("jobSucceedCount: " + jobSucceedCount) + jobSucceedCount.size() == 1 && '2' <= jobSucceedCount.get(0).get(0) + } + ) + } catch (Exception ex) { + def showjob = sql """select * from jobs("type"="insert") where Name='${jobName}'""" + def showtask = sql """select * from tasks("type"="insert") where JobName='${jobName}'""" + log.info("show job: " + showjob) + log.info("show task: " + showtask) + throw ex; + } + + def jobInfo = sql """ + select currentOffset, endoffset, loadStatistic from jobs("type"="insert") where Name='${jobName}' + """ + log.info("jobInfo: " + jobInfo) + def loadStat = parseJson(jobInfo.get(0).get(2)) + log.info("loadStatistic: " + jobInfo.get(0).get(2)) + + assert loadStat.scannedRows == 20 + assert loadStat.fileNumber == 2 + assert loadStat.filteredRows == 20 + + def rowCount = sql """ select count(1) from ${tableName} """ + assert rowCount.get(0).get(0) == 0 + + sql """ DROP JOB IF EXISTS where jobname = '${jobName}' """ + sql """drop table if exists `${tableName}` force""" +}