diff --git a/src/Storages/HybridSegmentPruner.cpp b/src/Storages/HybridSegmentPruner.cpp new file mode 100644 index 000000000000..8633fcd4ec84 --- /dev/null +++ b/src/Storages/HybridSegmentPruner.cpp @@ -0,0 +1,122 @@ +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ + +namespace +{ + +ASTPtr makeIdentityKeyAST(const Names & column_names) +{ + auto key_ast = make_intrusive(); + key_ast->name = "tuple"; + key_ast->arguments = make_intrusive(); + key_ast->children.push_back(key_ast->arguments); + for (const auto & name : column_names) + key_ast->arguments->children.push_back(make_intrusive(name)); + return key_ast; +} + +NamesAndTypesList filterComparable(const NamesAndTypesList & in) +{ + NamesAndTypesList out; + for (const auto & c : in) + if (c.type && c.type->isComparable()) + out.push_back(c); + return out; +} + +KeyDescription buildIdentityKey(const NamesAndTypesList & comparable_cols, ContextPtr context) +{ + Names names; + names.reserve(comparable_cols.size()); + for (const auto & c : comparable_cols) + names.push_back(c.name); + return KeyDescription::getKeyFromAST( + makeIdentityKeyAST(names), + ColumnsDescription{comparable_cols}, + context); +} + +NamesAndTypesList namesAndTypesFromKey(const KeyDescription & key) +{ + NamesAndTypesList out; + for (size_t i = 0; i < key.column_names.size(); ++i) + out.emplace_back(key.column_names[i], key.data_types[i]); + return out; +} + +} + +HybridSegmentPruner::HybridSegmentPruner( + const ActionsDAGWithInversionPushDown & filter_dag, + const NamesAndTypesList & hybrid_columns, + ContextPtr context_) + : identity_key(buildIdentityKey(filterComparable(hybrid_columns), context_)) + , user_condition(filter_dag, context_, + identity_key.column_names, identity_key.expression, + /*single_point=*/ false) + , context(std::move(context_)) +{ + useless = identity_key.column_names.empty() || user_condition.alwaysUnknownOrTrue(); +} + +bool HybridSegmentPruner::canBePruned(const ASTPtr & substituted_segment_predicate) const +try +{ + if (useless || !substituted_segment_predicate) + return false; + + auto segment_ast = substituted_segment_predicate->clone(); + auto sample = namesAndTypesFromKey(identity_key); + auto syntax_result = TreeRewriter(context).analyze(segment_ast, sample); + auto segment_dag = ExpressionAnalyzer(segment_ast, syntax_result, context).getActionsDAG(true); + ActionsDAGWithInversionPushDown segment_filter(segment_dag.getOutputs().at(0), context); + + KeyCondition segment_condition( + segment_filter, context, + identity_key.column_names, identity_key.expression, + /*single_point=*/ false); + + Hyperrectangle rect; + rect.reserve(identity_key.column_names.size()); + + for (size_t i = 0; i < identity_key.column_names.size(); ++i) + { + Ranges col_ranges; + if (!segment_condition.extractPlainRangesForColumn(i, col_ranges)) + { + rect.push_back(Range::createWholeUniverse()); + continue; + } + + if (col_ranges.empty()) + return true; + + if (col_ranges.size() != 1) + { + rect.push_back(Range::createWholeUniverse()); + continue; + } + + rect.push_back(col_ranges.front()); + } + + return !user_condition.checkInHyperrectangle(rect, identity_key.data_types).can_be_true; +} +catch (...) +{ + return false; +} + +} diff --git a/src/Storages/HybridSegmentPruner.h b/src/Storages/HybridSegmentPruner.h new file mode 100644 index 000000000000..7b6db05c9871 --- /dev/null +++ b/src/Storages/HybridSegmentPruner.h @@ -0,0 +1,47 @@ +#pragma once + +#include +#include +#include +#include +#include + +namespace DB +{ + +/// Hybrid-segment pruner, modeled after PartitionPruner / Iceberg::ManifestFilesPruner / +/// Paimon::PartitionPruner. +/// +/// Build one KeyCondition over the user filter (PREWHERE+WHERE represented as an +/// ActionsDAG) using all comparable Hybrid columns as the key. For each segment, build +/// a second KeyCondition from its (already watermark-substituted) predicate AST and +/// use `KeyCondition::extractPlainRangesForColumn` to obtain a Hyperrectangle (fail-open +/// to whole-universe per column when extraction is ambiguous). Then ask +/// `KeyCondition::checkInHyperrectangle(rect, types).can_be_true`. The segment can be +/// pruned iff the answer is false. +/// +/// canBePruned() returns true only when (user_filter AND segment_predicate) is provably +/// empty. It returns false in all other cases — unsupported segment shapes, missing user +/// filter, exceptions — so the caller falls back to scanning the segment normally. +class HybridSegmentPruner +{ +public: + HybridSegmentPruner( + const ActionsDAGWithInversionPushDown & filter_dag, + const NamesAndTypesList & hybrid_columns, + ContextPtr context); + + bool canBePruned(const ASTPtr & substituted_segment_predicate) const; + + /// True if the user filter is unrecognizable / always-true on the Hybrid key columns: + /// no segment can ever be pruned, so callers can short-circuit. + bool isUseless() const { return useless; } + +private: + KeyDescription identity_key; + KeyCondition user_condition; + ContextPtr context; + bool useless = false; +}; + +} diff --git a/src/Storages/MergeTree/KeyCondition.cpp b/src/Storages/MergeTree/KeyCondition.cpp index 58005311865b..da92b200aaaf 100644 --- a/src/Storages/MergeTree/KeyCondition.cpp +++ b/src/Storages/MergeTree/KeyCondition.cpp @@ -3226,12 +3226,109 @@ bool KeyCondition::extractPlainRanges(Ranges & ranges) const if (key_columns.size() != 1) return false; + return extractPlainRangesForColumn(0, ranges); +} + +bool KeyCondition::extractPlainRangesForColumn(size_t column_index, Ranges & ranges) const +{ + if (column_index >= key_columns.size()) + return false; + if (hasMonotonicFunctionsChain()) return false; - /// All Ranges in rpn_stack is plain. + /// All Ranges in rpn_stack are plain. std::stack rpn_stack; + auto push_range_atom = [&](const RPNElement & element, bool negate) + { + if (element.getKeyColumn() != column_index) + rpn_stack.push(PlainRanges::makeUniverse()); + else if (negate) + rpn_stack.push(PlainRanges(element.range.invertRange())); + else + rpn_stack.push(PlainRanges(element.range)); + }; + + auto find_tuple_index_for_column = [&](const RPNElement & element) -> std::optional + { + chassert(element.set_index); + for (const auto & mapping : element.set_index->getIndexesMapping()) + if (mapping.key_index == column_index) + return mapping.tuple_index; + return std::nullopt; + }; + + auto try_extract_set_ranges = [&](const RPNElement & element, bool negate, PlainRanges & out) -> bool + { + auto tuple_index = find_tuple_index_for_column(element); + if (!tuple_index) + { + out = PlainRanges::makeUniverse(); + return true; + } + + if (element.set_index->hasMonotonicFunctionsChain()) + return false; + + if (element.set_index->size() == 0) + { + out = negate ? PlainRanges::makeUniverse() : PlainRanges::makeBlank(); + return true; + } + + const auto & values = element.set_index->getOrderedSet(); + if (*tuple_index >= values.size()) + return false; + + const auto & column_values = *values[*tuple_index]; + const size_t values_size = element.set_index->size(); + Ranges points_range; + + if (!negate) + { + /// values in set_index are ordered and no duplication + for (size_t i = 0; i < values_size; ++i) + { + FieldRef value; + column_values.get(i, value); + if (value.isNull()) + return false; + points_range.push_back({value}); + } + } + else + { + std::optional previous; + for (size_t i = 0; i < values_size; ++i) + { + FieldRef current; + column_values.get(i, current); + if (current.isNull()) + return false; + + if (previous) + { + Range between(*previous, false, current, false); + /// skip blank range + if (!(between.left > between.right || (between.left == between.right && !between.left_included && !between.right_included))) + points_range.push_back(between); + } + else + { + points_range.push_back(Range::createRightBounded(current, false)); + } + + previous = current; + } + + points_range.push_back(Range::createLeftBounded(*previous, false)); + } + + out = PlainRanges(points_range); + return true; + }; + for (const auto & element : rpn) { if (element.function == RPNElement::FUNCTION_AND) @@ -3279,76 +3376,20 @@ bool KeyCondition::extractPlainRanges(Ranges & ranges) const else /// atom relational expression or constants { if (element.function == RPNElement::FUNCTION_IN_RANGE) - { - rpn_stack.push(PlainRanges(element.range)); - } + push_range_atom(element, /*negate=*/false); else if (element.function == RPNElement::FUNCTION_NOT_IN_RANGE) { - rpn_stack.push(PlainRanges(element.range.invertRange())); - } - else if (element.function == RPNElement::FUNCTION_IN_SET) - { - if (element.set_index->hasMonotonicFunctionsChain()) - return false; - - if (element.set_index->size() == 0) - { - rpn_stack.push(PlainRanges::makeBlank()); /// skip blank range - continue; - } - - const auto & values = element.set_index->getOrderedSet(); - Ranges points_range; - - /// values in set_index are ordered and no duplication - for (size_t i = 0; i < element.set_index->size(); i++) - { - FieldRef f; - values[0]->get(i, f); - if (f.isNull()) - return false; - points_range.push_back({f}); - } - rpn_stack.push(PlainRanges(points_range)); + push_range_atom(element, /*negate=*/true); } - else if (element.function == RPNElement::FUNCTION_NOT_IN_SET) + else if (element.function == RPNElement::FUNCTION_IN_SET || element.function == RPNElement::FUNCTION_NOT_IN_SET) { - if (element.set_index->hasMonotonicFunctionsChain()) + PlainRanges set_ranges = PlainRanges::makeUniverse(); + if (!try_extract_set_ranges( + element, + /*negate=*/element.function == RPNElement::FUNCTION_NOT_IN_SET, + set_ranges)) return false; - - if (element.set_index->size() == 0) - { - rpn_stack.push(PlainRanges::makeUniverse()); - continue; - } - - const auto & values = element.set_index->getOrderedSet(); - Ranges points_range; - - std::optional pre; - for (size_t i=0; isize(); i++) - { - FieldRef cur; - values[0]->get(i, cur); - - if (cur.isNull()) - return false; - if (pre) - { - Range r(*pre, false, cur, false); - /// skip blank range - if (!(r.left > r.right || (r.left == r.right && !r.left_included && !r.right_included))) - points_range.push_back(r); - } - else - { - points_range.push_back(Range::createRightBounded(cur, false)); - } - pre = cur; - } - - points_range.push_back(Range::createLeftBounded(*pre, false)); - rpn_stack.push(PlainRanges(points_range)); + rpn_stack.push(std::move(set_ranges)); } else if (element.function == RPNElement::ALWAYS_FALSE) { @@ -3379,7 +3420,7 @@ bool KeyCondition::extractPlainRanges(Ranges & ranges) const } if (rpn_stack.size() != 1) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected stack size in KeyCondition::extractPlainRanges"); + throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected stack size in KeyCondition::extractPlainRangesForColumn"); ranges = std::move(rpn_stack.top().ranges); return true; diff --git a/src/Storages/MergeTree/KeyCondition.h b/src/Storages/MergeTree/KeyCondition.h index b733920df792..b1f9099e87bd 100644 --- a/src/Storages/MergeTree/KeyCondition.h +++ b/src/Storages/MergeTree/KeyCondition.h @@ -199,6 +199,11 @@ class KeyCondition /// TODO handle the cases when generate RPN. bool extractPlainRanges(Ranges & ranges) const; + /// Same stack algorithm as extractPlainRanges, but for a multi-column key: logical ops apply + /// as usual, while atoms that constrain other key columns become the universe for `column_index`. + /// Returns false if the RPN contains unsupported atoms for this extraction (same as extractPlainRanges). + bool extractPlainRangesForColumn(size_t column_index, Ranges & ranges) const; + /// The expression is stored as Reverse Polish Notation. struct RPNElement { diff --git a/src/Storages/StorageDistributed.cpp b/src/Storages/StorageDistributed.cpp index ff1e0025aa9b..590b3e86830b 100644 --- a/src/Storages/StorageDistributed.cpp +++ b/src/Storages/StorageDistributed.cpp @@ -22,6 +22,7 @@ #include #include #include +#include #include #include @@ -102,7 +103,9 @@ #include #include #include +#include #include +#include #include #include #include @@ -536,6 +539,26 @@ QueryProcessingStage::Enum StorageDistributed::getQueryProcessingStage( } } + /// Hybrid segment pruning: mirror the per-shard pruning above, but at the segment level. + /// When a segment's predicate is provably unsatisfiable with the user query, drop it from + /// the plan. The base segment is signalled to `read()` by emptying `optimized_cluster` — + /// the same idiom `optimize_skip_unused_shards` uses for empty shard sets — and `nodes` is + /// recomputed automatically from the empty cluster. The verdict is recomputed in `read()` + /// for per-segment skipping; both calls read the watermark snapshot frozen on + /// `storage_snapshot` (see `HybridSnapshotData`), so the two verdicts agree even under a + /// concurrent `ALTER MODIFY SETTING hybrid_watermark_*`. + HybridPruningVerdict pruning_verdict; + if (!segments.empty() || base_segment_predicate) + { + pruning_verdict = computeHybridPruningVerdict(query_info, storage_snapshot, local_context); + if (pruning_verdict.base_pruned) + { + query_info.optimized_cluster = cluster->getClusterWithMultipleShards({}); + cluster = query_info.optimized_cluster; + nodes = getClusterQueriedNodes(settings, cluster); + } + } + if (settings[Setting::distributed_group_by_no_merge]) { if (settings[Setting::distributed_group_by_no_merge] == DISTRIBUTED_GROUP_BY_NO_MERGE_AFTER_AGGREGATION) @@ -560,7 +583,13 @@ QueryProcessingStage::Enum StorageDistributed::getQueryProcessingStage( // TODO: check logic if (!segments.empty()) - nodes += segments.size(); + { + size_t surviving_segments = segments.size(); + for (bool is_pruned : pruning_verdict.segments_pruned) + if (is_pruned && surviving_segments > 0) + --surviving_segments; + nodes += surviving_segments; + } /// If there is only one node, the query can be fully processed by the /// shard, initiator will work as a proxy only. @@ -792,6 +821,17 @@ std::optional StorageDistributed::getOptimizedQueryP StorageSnapshotPtr StorageDistributed::getStorageSnapshot(const StorageMetadataPtr & metadata_snapshot, ContextPtr) const { + /// For Hybrid tables, freeze the watermark snapshot at snapshot acquisition time so + /// every later phase (`getQueryProcessingStage()`, `read()`) operates on the same + /// values. A concurrent `ALTER MODIFY SETTING hybrid_watermark_*` cannot change what + /// this query sees, which keeps the pruning verdict — and therefore the chosen + /// processing stage — consistent with the planned segment set. + if (!segments.empty() || base_segment_predicate) + { + auto data = std::make_unique(); + data->watermark_snapshot = hybrid_watermark_params.get(); + return std::make_shared(*this, metadata_snapshot, std::move(data)); + } return std::make_shared(*this, metadata_snapshot); } @@ -1179,6 +1219,117 @@ QueryTreeNodePtr buildQueryTreeDistributed(SelectQueryInfo & query_info, } +ASTPtr StorageDistributed::substituteHybridWatermarks( + ASTPtr predicate_ast, + const MultiVersion::Version & watermarks) +{ + if (!predicate_ast) + return predicate_ast; + predicate_ast = predicate_ast->clone(); + + std::function replace_hybrid_params = [&](ASTPtr & node) + { + if (auto * func = node->as(); func && func->name == "hybridParam") + { + auto * arg_list = func->arguments ? func->arguments->as() : nullptr; + if (!arg_list || arg_list->children.size() != 2) + throw Exception(ErrorCodes::BAD_ARGUMENTS, + "hybridParam() requires exactly 2 arguments: (name, type)"); + + auto * name_lit = arg_list->children[0]->as(); + auto * type_lit = arg_list->children[1]->as(); + if (!name_lit || name_lit->value.getType() != Field::Types::String) + throw Exception(ErrorCodes::BAD_ARGUMENTS, + "hybridParam() first argument (name) must be a string literal"); + if (!type_lit || type_lit->value.getType() != Field::Types::String) + throw Exception(ErrorCodes::BAD_ARGUMENTS, + "hybridParam() second argument (type) must be a string literal"); + + const auto & param_name = name_lit->value.safeGet(); + const auto & type_name = type_lit->value.safeGet(); + + if (!watermarks) + throw Exception(ErrorCodes::BAD_ARGUMENTS, + "Hybrid watermark '{}' has no value; use ALTER TABLE ... MODIFY SETTING {} = '...' to set it", + param_name, param_name); + + auto it = watermarks->find(param_name); + if (it == watermarks->end()) + throw Exception(ErrorCodes::BAD_ARGUMENTS, + "Hybrid watermark '{}' has no value; use ALTER TABLE ... MODIFY SETTING {} = '...' to set it", + param_name, param_name); + + auto data_type = DataTypeFactory::instance().get(type_name); + auto col = data_type->createColumn(); + ReadBufferFromString buf(it->second); + data_type->getDefaultSerialization()->deserializeWholeText(*col, buf, FormatSettings{}); + node = make_intrusive((*col)[0]); + return; + } + + for (auto & child : node->children) + replace_hybrid_params(child); + }; + replace_hybrid_params(predicate_ast); + return predicate_ast; +} + +StorageDistributed::HybridPruningVerdict StorageDistributed::computeHybridPruningVerdict( + const SelectQueryInfo & query_info, + const StorageSnapshotPtr & storage_snapshot, + const ContextPtr & local_context) const +{ + StorageDistributed::HybridPruningVerdict verdict; + verdict.segments_pruned.assign(segments.size(), false); + + if (segments.empty() && !base_segment_predicate) + return verdict; + + /// Reuse the watermark snapshot frozen at `getStorageSnapshot()` time. Both + /// `getQueryProcessingStage()` and `read()` call this function with the same + /// `storage_snapshot`, so the verdict is identical across the two calls regardless + /// of any concurrent `ALTER MODIFY SETTING hybrid_watermark_*`. Falling back to a + /// fresh `MultiVersion::get()` only happens when `getStorageSnapshot()` did not + /// attach `HybridSnapshotData` (e.g. a code path that bypasses it); we keep the + /// fallback for defensiveness, but it is not exercised by the standard read path. + if (const auto * hybrid_data = storage_snapshot->data + ? dynamic_cast(storage_snapshot->data.get()) + : nullptr) + verdict.watermark_snapshot = hybrid_data->watermark_snapshot; + else + verdict.watermark_snapshot = hybrid_watermark_params.get(); + + /// Without a materialized user filter (legacy non-analyzer path, or a query before + /// filter actions are computed) we can't prune. Fail open — same precedent as + /// `skipUnusedShardsWithAnalyzer()`. The DAG is per-table-expression, so JOIN-side + /// predicates are already excluded; no JOIN guard needed. + if (!query_info.filter_actions_dag) + return verdict; + + NamesAndTypesList hybrid_columns = storage_snapshot->metadata->getColumns().getAll(); + ActionsDAGWithInversionPushDown inverted_dag( + query_info.filter_actions_dag->getOutputs().at(0), local_context); + HybridSegmentPruner pruner(inverted_dag, hybrid_columns, local_context); + if (pruner.isUseless()) + return verdict; + + auto check = [&](const ASTPtr & predicate_ast) -> bool + { + if (!predicate_ast) + return false; + return pruner.canBePruned( + substituteHybridWatermarks(predicate_ast, verdict.watermark_snapshot)); + }; + + if (base_segment_predicate) + verdict.base_pruned = check(base_segment_predicate); + + for (size_t i = 0; i < segments.size(); ++i) + verdict.segments_pruned[i] = check(segments[i].predicate_ast); + + return verdict; +} + void StorageDistributed::read( QueryPlan & query_plan, const Names &, @@ -1229,61 +1380,30 @@ void StorageDistributed::read( LOG_TRACE(log, "rewriteSelectQuery (target: {}) -> {}", target, ast->formatForLogging()); }; - auto watermark_snapshot = hybrid_watermark_params.get(); - - auto substitute_watermarks = [&](ASTPtr predicate_ast) -> ASTPtr - { - if (!predicate_ast) - return predicate_ast; - predicate_ast = predicate_ast->clone(); - - std::function replace_hybrid_params = [&](ASTPtr & node) - { - if (auto * func = node->as(); func && func->name == "hybridParam") - { - auto * arg_list = func->arguments ? func->arguments->as() : nullptr; - if (!arg_list || arg_list->children.size() != 2) - throw Exception(ErrorCodes::BAD_ARGUMENTS, - "hybridParam() requires exactly 2 arguments: (name, type)"); - - auto * name_lit = arg_list->children[0]->as(); - auto * type_lit = arg_list->children[1]->as(); - if (!name_lit || name_lit->value.getType() != Field::Types::String) - throw Exception(ErrorCodes::BAD_ARGUMENTS, - "hybridParam() first argument (name) must be a string literal"); - if (!type_lit || type_lit->value.getType() != Field::Types::String) - throw Exception(ErrorCodes::BAD_ARGUMENTS, - "hybridParam() second argument (type) must be a string literal"); - - const auto & param_name = name_lit->value.safeGet(); - const auto & type_name = type_lit->value.safeGet(); - - if (!watermark_snapshot) - throw Exception(ErrorCodes::BAD_ARGUMENTS, - "Hybrid watermark '{}' has no value; use ALTER TABLE ... MODIFY SETTING {} = '...' to set it", - param_name, param_name); + /// Recompute the Hybrid pruning verdict for per-segment skipping. The watermark snapshot + /// it depends on was frozen at `getStorageSnapshot()` time and is reused via + /// `HybridSnapshotData`, so this verdict matches the one `getQueryProcessingStage()` + /// produced — both the surviving-segment set and the substitution of `hybridParam(...)` + /// literals stay consistent with the chosen processing stage even under a concurrent + /// `ALTER MODIFY SETTING hybrid_watermark_*`. + HybridPruningVerdict pruning_verdict; + if (!segments.empty() || base_segment_predicate) + pruning_verdict = computeHybridPruningVerdict(query_info, storage_snapshot, local_context); - auto it = watermark_snapshot->find(param_name); - if (it == watermark_snapshot->end()) - throw Exception(ErrorCodes::BAD_ARGUMENTS, - "Hybrid watermark '{}' has no value; use ALTER TABLE ... MODIFY SETTING {} = '...' to set it", - param_name, param_name); + auto watermark_snapshot = pruning_verdict.watermark_snapshot + ? pruning_verdict.watermark_snapshot : hybrid_watermark_params.get(); - auto data_type = DataTypeFactory::instance().get(type_name); - auto col = data_type->createColumn(); - ReadBufferFromString buf(it->second); - data_type->getDefaultSerialization()->deserializeWholeText(*col, buf, FormatSettings{}); - node = make_intrusive((*col)[0]); - return; - } - - for (auto & child : node->children) - replace_hybrid_params(child); - }; - replace_hybrid_params(predicate_ast); - return predicate_ast; + auto try_prune_additional = [&](size_t segment_idx, const String & target) -> bool + { + if (segment_idx >= pruning_verdict.segments_pruned.size() || !pruning_verdict.segments_pruned[segment_idx]) + return false; + LOG_TRACE(log, "Hybrid segment pruned (target: {})", target); + return true; }; + if (pruning_verdict.base_pruned) + LOG_TRACE(log, "Hybrid segment pruned (target: {})", base_target); + if (settings[Setting::allow_experimental_analyzer]) { StorageID remote_storage_id = StorageID::createEmpty(); @@ -1294,7 +1414,7 @@ void StorageDistributed::read( query_info.initial_storage_snapshot ? query_info.initial_storage_snapshot : storage_snapshot, remote_storage_id, remote_table_function_ptr, - substitute_watermarks(base_segment_predicate)); + substituteHybridWatermarks(base_segment_predicate, watermark_snapshot)); Block block = *InterpreterSelectQueryAnalyzer::getSampleBlock(query_tree_distributed, local_context, SelectQueryOptions(processed_stage).analyze()); /** For distributed tables we do not need constants in header, since we don't send them to remote servers. * Moreover, constants can break some functions like `hostName` that are constants only for local queries. @@ -1311,8 +1431,14 @@ void StorageDistributed::read( if (!segments.empty()) { - for (const auto & segment : segments) + for (size_t segment_idx = 0; segment_idx < segments.size(); ++segment_idx) { + const auto & segment = segments[segment_idx]; + if (try_prune_additional(segment_idx, describe_segment_target(segment))) + continue; + + ASTPtr substituted_predicate = substituteHybridWatermarks(segment.predicate_ast, watermark_snapshot); + // Create a modified query info with the segment predicate SelectQueryInfo additional_query_info = query_info; @@ -1320,7 +1446,7 @@ void StorageDistributed::read( query_info.initial_storage_snapshot ? query_info.initial_storage_snapshot : storage_snapshot, segment.storage_id ? *segment.storage_id : StorageID::createEmpty(), segment.storage_id ? nullptr : segment.table_function_ast, - substitute_watermarks(segment.predicate_ast)); + substituted_predicate); additional_query_info.query = queryNodeToDistributedSelectQuery(additional_query_tree); additional_query_info.query_tree = std::move(additional_query_tree); @@ -1330,8 +1456,11 @@ void StorageDistributed::read( } } - // For empty shards - avoid early return if we have additional segments - if (modified_query_info.getCluster()->getShardsInfo().empty() && segments.empty()) + /// Empty cluster + nothing else to plan: take the same path Distributed already uses + /// when `optimize_skip_unused_shards` filters every shard. For Hybrid this is the + /// "all segments pruned" case (base pruned via empty `optimized_cluster`, every + /// additional pruned via the segments loop above). + if (modified_query_info.getCluster()->getShardsInfo().empty() && additional_query_infos.empty()) return; } else @@ -1341,16 +1470,20 @@ void StorageDistributed::read( modified_query_info.query = ClusterProxy::rewriteSelectQuery( local_context, modified_query_info.query, remote_database, remote_table, remote_table_function_ptr, - substitute_watermarks(base_segment_predicate)); + substituteHybridWatermarks(base_segment_predicate, watermark_snapshot)); log_rewritten_query(base_target, modified_query_info.query); if (!segments.empty()) { - for (const auto & segment : segments) + for (size_t segment_idx = 0; segment_idx < segments.size(); ++segment_idx) { + const auto & segment = segments[segment_idx]; + if (try_prune_additional(segment_idx, describe_segment_target(segment))) + continue; + + ASTPtr resolved_predicate = substituteHybridWatermarks(segment.predicate_ast, watermark_snapshot); SelectQueryInfo additional_query_info = query_info; - ASTPtr resolved_predicate = substitute_watermarks(segment.predicate_ast); if (segment.storage_id) { additional_query_info.query = ClusterProxy::rewriteSelectQuery( @@ -1372,8 +1505,11 @@ void StorageDistributed::read( } } - // For empty shards - avoid early return if we have additional segments - if (modified_query_info.getCluster()->getShardsInfo().empty() && segments.empty()) + /// Empty cluster + nothing else to plan: take the same path Distributed already uses + /// when `optimize_skip_unused_shards` filters every shard. For Hybrid this is the + /// "all segments pruned" case (base pruned via empty `optimized_cluster`, every + /// additional pruned via the segments loop above). + if (modified_query_info.getCluster()->getShardsInfo().empty() && additional_query_infos.empty()) { Pipe pipe(std::make_shared(header)); auto read_from_pipe = std::make_unique(std::move(pipe)); @@ -1384,6 +1520,45 @@ void StorageDistributed::read( } } + /// Hybrid case 2: base pruned (cluster empty via `getQueryProcessingStage`'s empty + /// `optimized_cluster`) and at least one additional segment survives. The all-pruned + /// subcase is already handled by the existing empty-cluster early-returns above. We + /// can't call `ClusterProxy::executeQuery` with an empty cluster (its + /// `updateSettingsAndClientInfoForCluster` dereferences `getShardsAddresses().front()` + /// when `is_remote_function=true`), so build the local plans directly. The block below + /// is the same shape as the `additional_query_infos` block in `ClusterProxy::executeQuery` + /// — that block uses the original context (not `new_context`), so we don't depend on the + /// shared distributed-context setup. + if (modified_query_info.getCluster()->getShardsInfo().empty() && !additional_query_infos.empty()) + { + const Block & header_block = *header; + std::vector plans; + plans.reserve(additional_query_infos.size()); + for (const auto & additional_query_info : additional_query_infos) + { + plans.emplace_back(createLocalPlan( + additional_query_info.query, header_block, local_context, + processed_stage, /*shard_num=*/0, /*shard_count=*/1, /*has_missing_objects=*/false, "")); + } + + if (plans.size() == 1) + { + query_plan = std::move(*plans.front()); + } + else + { + SharedHeaders input_headers; + input_headers.reserve(plans.size()); + for (auto & plan : plans) + input_headers.emplace_back(plan->getCurrentHeader()); + + auto union_step = std::make_unique(std::move(input_headers)); + union_step->setStepDescription("Hybrid"); + query_plan.unitePlans(std::move(union_step), std::move(plans)); + } + return; + } + if (!modified_query_info.getCluster()->getShardsInfo().empty() || !additional_query_infos.empty()) { ClusterProxy::SelectStreamFactory select_stream_factory = diff --git a/src/Storages/StorageDistributed.h b/src/Storages/StorageDistributed.h index 138a49d9c992..1dd6690650fb 100644 --- a/src/Storages/StorageDistributed.h +++ b/src/Storages/StorageDistributed.h @@ -220,6 +220,41 @@ class StorageDistributed final : public IStorage, WithContext std::vector getDirectoryQueueStatuses() const; static IColumn::Selector createSelector(ClusterPtr cluster, const ColumnWithTypeAndName & result); + + /// Substitute hybridParam(name, type) calls in `predicate_ast` with literal values from + /// `watermarks`. Returns a fresh cloned AST. Pass-through for nullptr. + static ASTPtr substituteHybridWatermarks( + ASTPtr predicate_ast, + const MultiVersion::Version & watermarks); + + /// Hybrid-specific snapshot-time state attached to `StorageSnapshot::data`. Populated + /// once in `StorageDistributed::getStorageSnapshot()` so the watermark values seen by + /// `getQueryProcessingStage()` and `read()` cannot diverge mid-query under a concurrent + /// `ALTER MODIFY SETTING hybrid_watermark_*`. Without this, two independent + /// `MultiVersion::get()` calls could observe different versions and produce inconsistent + /// pruning verdicts (e.g. a `Complete`-stage plan unioned without final merge). + struct HybridSnapshotData : public StorageSnapshot::Data + { + MultiVersion::Version watermark_snapshot; + }; + + /// Per-query Hybrid pruning verdict. Recomputed in both `getQueryProcessingStage()` + /// (to drive the stage decision and empty `optimized_cluster` when the base is pruned) + /// and `read()` (to skip planning of pruned additional segments). The verdict is + /// deterministic across both calls because the watermark snapshot it depends on is + /// taken once at `getStorageSnapshot()` time and reused via `HybridSnapshotData`. + struct HybridPruningVerdict + { + bool base_pruned = false; + std::vector segments_pruned; + MultiVersion::Version watermark_snapshot; + }; + + HybridPruningVerdict computeHybridPruningVerdict( + const SelectQueryInfo & query_info, + const StorageSnapshotPtr & storage_snapshot, + const ContextPtr & local_context) const; + /// Apply the following settings: /// - optimize_skip_unused_shards /// - force_optimize_skip_unused_shards diff --git a/src/Storages/tests/gtest_hybrid_segment_pruner.cpp b/src/Storages/tests/gtest_hybrid_segment_pruner.cpp new file mode 100644 index 000000000000..02af38dd3865 --- /dev/null +++ b/src/Storages/tests/gtest_hybrid_segment_pruner.cpp @@ -0,0 +1,106 @@ +#include + +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +using namespace DB; + +namespace +{ + +ASTPtr parseExpression(const std::string & text) +{ + ParserExpression parser; + return parseQuery(parser, text, 4096, 1000, 1000000); +} + +NamesAndTypesList hybridColumnsForTests() +{ + return { + {"ts", std::make_shared()}, + {"date", std::make_shared()}, + {"customerid", std::make_shared()}, + {"x", std::make_shared()}, + {"y", std::make_shared()}, + }; +} + +/// Build a HybridSegmentPruner over `where_text` and ask whether `segment_text` can be pruned. +/// The user-side ActionsDAG is built via the same `TreeRewriter + ExpressionAnalyzer` idiom +/// the planner uses to populate `query_info.filter_actions_dag`. +bool canPrune(const std::string & where_text, const std::string & segment_text) +{ + auto context = getContext().context; + auto cols = hybridColumnsForTests(); + + auto where_ast = parseExpression(where_text); + auto syntax_result = TreeRewriter(context).analyze(where_ast, cols); + /// `add_aliases=true` projects the DAG outputs to the predicate only, mirroring the shape of + /// the analyzer-built `query_info.filter_actions_dag` (one output = the filter expression). + /// With `add_aliases=false` the source columns are also kept as outputs, so `getOutputs().at(0)` + /// can point to an input column instead of the predicate. + auto dag = ExpressionAnalyzer(where_ast, syntax_result, context).getActionsDAG(true); + + ActionsDAGWithInversionPushDown inverted(dag.getOutputs().at(0), context); + HybridSegmentPruner pruner(inverted, cols, context); + + return pruner.canBePruned(parseExpression(segment_text)); +} + +class HybridSegmentPrunerTest : public ::testing::Test +{ +public: + static void SetUpTestSuite() + { + tryRegisterFunctions(); + } +}; + +} + +TEST_F(HybridSegmentPrunerTest, RangeContradictionPrunes) +{ + /// `ts > '2025-10-01'` (user) ∧ `ts <= '2025-09-01'` (segment) is unsat → prune. + EXPECT_TRUE(canPrune("ts > '2025-10-01'", "ts <= '2025-09-01'")); +} + +TEST_F(HybridSegmentPrunerTest, OverlappingRangeKeeps) +{ + /// `ts > '2025-10-01'` (user) ∧ `ts > '2025-08-01'` (segment) is satisfiable → keep. + EXPECT_FALSE(canPrune("ts > '2025-10-01'", "ts > '2025-08-01'")); +} + +TEST_F(HybridSegmentPrunerTest, BoundedDnfWithConstantFolding) +{ + /// `(date = yesterday() AND customerid IN (2, 3)) OR (date = today() AND customerid IN (2, 3))` + /// (user) ∧ `date < '2015-01-01'` (segment): KeyCondition handles the OR by itself; the segment + /// hyperrectangle on `date` is (-∞, '2015-01-01'), which excludes both yesterday() and today(). + EXPECT_TRUE(canPrune( + "(date = yesterday() AND customerid IN (2, 3)) OR (date = today() AND customerid IN (2, 3))", + "date < '2015-01-01'")); +} + +TEST_F(HybridSegmentPrunerTest, OrAlternativeNotMandatoryConstraint) +{ + /// `(x < 0 OR y = 1) AND x > 5` (user) ∧ `x > 0` (segment): the OR's `y = 1` branch is + /// satisfiable inside the segment hyperrectangle (e.g. x = 10, y = 1) → keep. + EXPECT_FALSE(canPrune("(x < 0 OR y = 1) AND x > 5", "x > 0")); +} + +TEST_F(HybridSegmentPrunerTest, UnsupportedAtomInOrKeeps) +{ + /// The OR contains an atom KeyCondition can't analyze (`length(toString(x)) > 10`), + /// so it conservatively keeps the segment. + EXPECT_FALSE(canPrune("(length(toString(x)) > 10 OR x = 1) AND x = 2", "x > 0")); +} diff --git a/tests/queries/0_stateless/03646_hybrid_segment_pruning.reference b/tests/queries/0_stateless/03646_hybrid_segment_pruning.reference new file mode 100644 index 000000000000..b9a8b083e394 --- /dev/null +++ b/tests/queries/0_stateless/03646_hybrid_segment_pruning.reference @@ -0,0 +1,84 @@ +-- {echoOn} + +-- Test 1: Baseline (no pruning) — both segments planned, Union (Hybrid) present. +SELECT count() FROM pruning_t; +4 +SELECT count() FROM pruning_t WHERE value > 0; +4 +EXPLAIN SELECT count() FROM pruning_t WHERE value > 0; +Expression ((Project names + Projection)) + MergingAggregated + Union (Hybrid) + Aggregating + Expression (Before GROUP BY) + Expression ((WHERE + Change column names to column identifiers)) + ReadFromMergeTree (default.local_hot) + MergingAggregated + Aggregating + Expression (Before GROUP BY) + Expression ((WHERE + Change column names to column identifiers)) + ReadFromMergeTree (default.local_cold) +-- Test 2: Cold (additional) segment pruned via range contradiction — only base remains. +SELECT count() FROM pruning_t WHERE ts > '2025-10-01'; +2 +EXPLAIN SELECT count() FROM pruning_t WHERE ts > '2025-10-01'; +Expression ((Project names + Projection)) + Aggregating + ReadFromPreparedSource (_exact_count_projection) +-- Test 3: Hot (base) segment pruned — only cold remains as a local plan. +SELECT count() FROM pruning_t WHERE ts <= '2025-08-01'; +2 +EXPLAIN SELECT count() FROM pruning_t WHERE ts <= '2025-08-01'; +Expression ((Project names + Projection)) + Aggregating + ReadFromPreparedSource (_exact_count_projection) +-- Test 4: PREWHERE participates in pruning. +SELECT count() FROM pruning_t PREWHERE ts > '2025-10-01'; +2 +EXPLAIN SELECT count() FROM pruning_t PREWHERE ts > '2025-10-01'; +Expression ((Project names + Projection)) + Aggregating + ReadFromPreparedSource (_exact_count_projection) +-- Test 5: All segments pruned — getQueryProcessingStage returns FetchColumns, +-- planner inserts ReadNothing, AggregatingTransform synthesizes the default row. +SELECT count() FROM pruning_t WHERE ts > '2025-12-01' AND ts <= '2025-08-01'; +0 +EXPLAIN SELECT count() FROM pruning_t WHERE ts > '2025-12-01' AND ts <= '2025-08-01'; +Expression ((Project names + Projection)) + Aggregating + Expression (Before GROUP BY) + Filter ((WHERE + Change column names to column identifiers)) + ReadNothing (Read from NullSource) +-- {echoOn} + +-- Test 6: three segments, only hot survives. +SELECT count() FROM pruning_t3 WHERE ts > '2025-10-01'; +2 +EXPLAIN SELECT count() FROM pruning_t3 WHERE ts > '2025-10-01'; +Expression ((Project names + Projection)) + Aggregating + ReadFromPreparedSource (_exact_count_projection) +-- Test 7: OR alternative is not a mandatory constraint — hot survives via the OR. +SELECT count() FROM pruning_or WHERE (value = 1 OR value = 2) AND ts > '2025-10-01'; +2 +EXPLAIN SELECT count() FROM pruning_or WHERE (value = 1 OR value = 2) AND ts > '2025-10-01'; +Expression ((Project names + Projection)) + Aggregating + Expression (Before GROUP BY) + Expression ((WHERE + Change column names to column identifiers)) + ReadFromMergeTree (default.local_hot) +-- Test 8: JOIN — pruner conservatively skips, both segments planned. Only the count is +-- asserted here because EXPLAIN's JOIN-side ordering depends on randomized settings the +-- test harness cycles through (e.g. query_plan_join_swap_table). +SELECT count() +FROM pruning_t AS t +INNER JOIN dim AS d ON t.value = d.id +WHERE d.id > 1 AND t.ts <= '2025-08-01'; +2 +-- Test 9: SELECT alias shadows a Hybrid column used by segment predicates. With default +-- prefer_column_name_to_alias=0 the WHERE's `ts` resolves to the alias expression (a +-- constant true for every row); if the pruner mistakenly treated the unresolved `ts` as +-- the Hybrid column it would prune the cold segment (`ts <= '2025-09-01'`) and silently +-- drop those rows. All 4 rows must survive. +SELECT count() FROM (SELECT toDateTime('2025-11-01') AS ts, value FROM pruning_t WHERE ts > '2025-10-01'); +4 diff --git a/tests/queries/0_stateless/03646_hybrid_segment_pruning.sql b/tests/queries/0_stateless/03646_hybrid_segment_pruning.sql new file mode 100644 index 000000000000..27cdee91bbab --- /dev/null +++ b/tests/queries/0_stateless/03646_hybrid_segment_pruning.sql @@ -0,0 +1,127 @@ +-- Tags: no-fasttest +-- Tag no-fasttest: requires remote() table function + +SET allow_experimental_hybrid_table = 1; + +-- The EXPLAIN-based assertions below print plan shapes that some randomized session +-- settings perturb. Pin them for deterministic output. None of these affect pruning logic; +-- they just stabilize how the plan is rendered. +SET prefer_localhost_replica = 1; -- avoid ReadFromRemote vs ReadFromMergeTree flips +SET query_plan_join_swap_table = 'false'; -- pin JOIN side ordering +SET use_query_condition_cache = 0; -- consistent EXPLAIN across runs +SET optimize_trivial_count_query = 1; +SET parallel_replicas_local_plan = 0; + +DROP TABLE IF EXISTS local_hot SYNC; +DROP TABLE IF EXISTS local_cold SYNC; +DROP TABLE IF EXISTS local_warm SYNC; +DROP TABLE IF EXISTS pruning_t SYNC; +DROP TABLE IF EXISTS pruning_t3 SYNC; +DROP TABLE IF EXISTS pruning_or SYNC; +DROP TABLE IF EXISTS dim SYNC; + +CREATE TABLE local_hot (ts DateTime, value UInt64) ENGINE = MergeTree ORDER BY ts; +CREATE TABLE local_cold (ts DateTime, value UInt64) ENGINE = MergeTree ORDER BY ts; +INSERT INTO local_hot VALUES ('2025-10-15', 1), ('2025-11-01', 2); +INSERT INTO local_cold VALUES ('2025-08-01', 3), ('2025-06-15', 4); + +CREATE TABLE pruning_t +ENGINE = Hybrid( + remote('127.0.0.1:9000', currentDatabase(), 'local_hot'), + ts > hybridParam('hybrid_watermark_hot', 'DateTime'), + remote('127.0.0.1:9000', currentDatabase(), 'local_cold'), + ts <= hybridParam('hybrid_watermark_hot', 'DateTime') +) +SETTINGS hybrid_watermark_hot = '2025-09-01' +AS local_hot; + +-- {echoOn} + +-- Test 1: Baseline (no pruning) — both segments planned, Union (Hybrid) present. +SELECT count() FROM pruning_t; +SELECT count() FROM pruning_t WHERE value > 0; +EXPLAIN SELECT count() FROM pruning_t WHERE value > 0; + +-- Test 2: Cold (additional) segment pruned via range contradiction — only base remains. +SELECT count() FROM pruning_t WHERE ts > '2025-10-01'; +EXPLAIN SELECT count() FROM pruning_t WHERE ts > '2025-10-01'; + +-- Test 3: Hot (base) segment pruned — only cold remains as a local plan. +SELECT count() FROM pruning_t WHERE ts <= '2025-08-01'; +EXPLAIN SELECT count() FROM pruning_t WHERE ts <= '2025-08-01'; + +-- Test 4: PREWHERE participates in pruning. +SELECT count() FROM pruning_t PREWHERE ts > '2025-10-01'; +EXPLAIN SELECT count() FROM pruning_t PREWHERE ts > '2025-10-01'; + +-- Test 5: All segments pruned — getQueryProcessingStage returns FetchColumns, +-- planner inserts ReadNothing, AggregatingTransform synthesizes the default row. +SELECT count() FROM pruning_t WHERE ts > '2025-12-01' AND ts <= '2025-08-01'; +EXPLAIN SELECT count() FROM pruning_t WHERE ts > '2025-12-01' AND ts <= '2025-08-01'; + +-- {echoOff} + +-- Test 6: three-segment table; cold + middle pruned, only hot kept. +CREATE TABLE local_warm (ts DateTime, value UInt64) ENGINE = MergeTree ORDER BY ts; +INSERT INTO local_warm VALUES ('2025-09-15', 5), ('2025-09-25', 6); + +CREATE TABLE pruning_t3 +ENGINE = Hybrid( + remote('127.0.0.1:9000', currentDatabase(), 'local_hot'), + ts > hybridParam('hybrid_watermark_hot', 'DateTime'), + remote('127.0.0.1:9000', currentDatabase(), 'local_warm'), + ts > hybridParam('hybrid_watermark_cold', 'DateTime') + AND ts <= hybridParam('hybrid_watermark_hot', 'DateTime'), + remote('127.0.0.1:9000', currentDatabase(), 'local_cold'), + ts <= hybridParam('hybrid_watermark_cold', 'DateTime') +) +SETTINGS hybrid_watermark_hot = '2025-10-01', hybrid_watermark_cold = '2025-09-01' +AS local_hot; + +CREATE TABLE pruning_or +ENGINE = Hybrid( + remote('127.0.0.1:9000', currentDatabase(), 'local_hot'), + ts > hybridParam('hybrid_watermark_hot', 'DateTime'), + remote('127.0.0.1:9000', currentDatabase(), 'local_cold'), + ts <= hybridParam('hybrid_watermark_hot', 'DateTime') +) +SETTINGS hybrid_watermark_hot = '2025-09-01' +AS local_hot; + +CREATE TABLE dim (id UInt64, label String) ENGINE = MergeTree ORDER BY id; +INSERT INTO dim VALUES (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd'); + +-- {echoOn} + +-- Test 6: three segments, only hot survives. +SELECT count() FROM pruning_t3 WHERE ts > '2025-10-01'; +EXPLAIN SELECT count() FROM pruning_t3 WHERE ts > '2025-10-01'; + +-- Test 7: OR alternative is not a mandatory constraint — hot survives via the OR. +SELECT count() FROM pruning_or WHERE (value = 1 OR value = 2) AND ts > '2025-10-01'; +EXPLAIN SELECT count() FROM pruning_or WHERE (value = 1 OR value = 2) AND ts > '2025-10-01'; + +-- Test 8: JOIN — pruner conservatively skips, both segments planned. Only the count is +-- asserted here because EXPLAIN's JOIN-side ordering depends on randomized settings the +-- test harness cycles through (e.g. query_plan_join_swap_table). +SELECT count() +FROM pruning_t AS t +INNER JOIN dim AS d ON t.value = d.id +WHERE d.id > 1 AND t.ts <= '2025-08-01'; + +-- Test 9: SELECT alias shadows a Hybrid column used by segment predicates. With default +-- prefer_column_name_to_alias=0 the WHERE's `ts` resolves to the alias expression (a +-- constant true for every row); if the pruner mistakenly treated the unresolved `ts` as +-- the Hybrid column it would prune the cold segment (`ts <= '2025-09-01'`) and silently +-- drop those rows. All 4 rows must survive. +SELECT count() FROM (SELECT toDateTime('2025-11-01') AS ts, value FROM pruning_t WHERE ts > '2025-10-01'); + +-- {echoOff} + +DROP TABLE IF EXISTS dim SYNC; +DROP TABLE IF EXISTS pruning_or SYNC; +DROP TABLE IF EXISTS pruning_t3 SYNC; +DROP TABLE IF EXISTS pruning_t SYNC; +DROP TABLE IF EXISTS local_hot SYNC; +DROP TABLE IF EXISTS local_cold SYNC; +DROP TABLE IF EXISTS local_warm SYNC;