Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,8 @@ DEFINE_mInt32(pk_index_page_cache_stale_sweep_time_sec, "600");
DEFINE_mBool(enable_low_cardinality_optimize, "true");
DEFINE_Bool(enable_low_cardinality_cache_code, "true");

DEFINE_mBool(enable_adaptive_batch_size, "true");

// be policy
// whether check compaction checksum
DEFINE_mBool(enable_compaction_checksum, "false");
Expand Down
5 changes: 5 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,11 @@ DECLARE_mInt32(pk_index_page_cache_stale_sweep_time_sec);
DECLARE_mBool(enable_low_cardinality_optimize);
DECLARE_Bool(enable_low_cardinality_cache_code);

// Adaptive batch size: dynamically adjust SegmentIterator chunk row count using EWMA
// so that each output block stays close to preferred_block_size_bytes.
// When false, the fixed batch_size row behaviour is preserved.
DECLARE_mBool(enable_adaptive_batch_size);

// be policy
// whether check compaction checksum
DECLARE_mBool(enable_compaction_checksum);
Expand Down
20 changes: 18 additions & 2 deletions be/src/core/block/block.h
Original file line number Diff line number Diff line change
Expand Up @@ -186,12 +186,17 @@ class Block {

Status check_type_and_column() const;

/// Approximate number of bytes in memory - for profiling and limits.
/// Approximate number of bytes used by column data in memory.
/// This reflects the actual data footprint (e.g. string contents, numeric arrays)
/// and is the metric used by adaptive batch size byte budgets.
size_t bytes() const;

/// Returns per-column byte sizes as a comma-separated string (for debugging).
std::string columns_bytes() const;

/// Approximate number of allocated bytes in memory - for profiling and limits.
/// Approximate number of allocated (reserved) bytes in memory.
/// This may be larger than bytes() due to pre-allocated capacity in vectors/arenas.
/// Used for memory tracking and profiling.
MOCK_FUNCTION size_t allocated_bytes() const;

/** Get a list of column names separated by commas. */
Expand Down Expand Up @@ -355,6 +360,17 @@ class Block {
void clear_column_mem_not_keep(const std::vector<bool>& column_keep_flags,
bool need_keep_first);

// Helper: sum byte_size() of all mutable columns.
// Unlike Block::bytes() which operates on immutable ColumnPtr,
// this works on MutableColumns during block construction (e.g. in BlockReader).
static inline size_t columns_byte_size(const MutableColumns& cols) {
size_t total = 0;
for (const auto& col : cols) {
total += col->byte_size();
}
return total;
}

private:
void erase_impl(size_t position);
};
Expand Down
27 changes: 26 additions & 1 deletion be/src/exec/operator/mock_scan_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
// specific language governing permissions and limitations
// under the License.

#include <list>

#include "exec/operator/scan_operator.h"

#ifdef BE_TEST
Expand Down Expand Up @@ -80,6 +82,29 @@ class MockScanOperatorX final : public ScanOperatorX<MockScanLocalState> {
public:
friend class OlapScanLocalState;
MockScanOperatorX() = default;

void set_output_block(Block block) {
_output_blocks.clear();
_output_blocks.push_back(std::move(block));
}

Status get_block(RuntimeState* state, Block* block, bool* eos) override {
if (_output_blocks.empty()) {
*eos = true;
return Status::OK();
}

*eos = false;
block->swap(_output_blocks.front());
_output_blocks.pop_front();
if (_output_blocks.empty()) {
*eos = true;
}
return Status::OK();
}

private:
std::list<Block> _output_blocks;
};
} // namespace doris
#endif
#endif
5 changes: 5 additions & 0 deletions be/src/exec/operator/olap_scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,11 @@ Status OlapScanLocalState::_init_profile() {
ADD_COUNTER(_segment_profile, "ConditionCacheSegmentHit", TUnit::UNIT);
_condition_cache_filtered_rows_counter =
ADD_COUNTER(_segment_profile, "ConditionCacheFilteredRows", TUnit::UNIT);
_adaptive_batch_predict_min_rows_counter =
ADD_COUNTER(_segment_profile, "AdaptiveBatchPredictMinRows", TUnit::UNIT);
_adaptive_batch_predict_max_rows_counter =
ADD_COUNTER(_segment_profile, "AdaptiveBatchPredictMaxRows", TUnit::UNIT);

return Status::OK();
}

Expand Down
3 changes: 3 additions & 0 deletions be/src/exec/operator/olap_scan_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,9 @@ class OlapScanLocalState final : public ScanLocalState<OlapScanLocalState> {
// Variant subtree: times selecting doc snapshot all iterator (merge doc snapshot into root)
RuntimeProfile::Counter* _variant_doc_value_column_iter_count = nullptr;

RuntimeProfile::Counter* _adaptive_batch_predict_min_rows_counter = nullptr;
RuntimeProfile::Counter* _adaptive_batch_predict_max_rows_counter = nullptr;

std::vector<TabletWithVersion> _tablets;
std::vector<TabletReadSource> _read_sources;

Expand Down
17 changes: 12 additions & 5 deletions be/src/exec/operator/operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -385,10 +385,7 @@ Status OperatorXBase::get_block_after_projects(RuntimeState* state, Block* block
auto* local_state = state->get_local_state(operator_id());
Defer defer([&]() {
if (status.ok()) {
if (auto rows = block->rows()) {
COUNTER_UPDATE(local_state->_rows_returned_counter, rows);
COUNTER_UPDATE(local_state->_blocks_returned_counter, 1);
}
local_state->update_output_block_counters(*block);
}
});
if (_output_row_descriptor) {
Expand Down Expand Up @@ -505,7 +502,11 @@ PipelineXSinkLocalStateBase::PipelineXSinkLocalStateBase(DataSinkOperatorXBase*
: _parent(parent), _state(state) {}

PipelineXLocalStateBase::PipelineXLocalStateBase(RuntimeState* state, OperatorXBase* parent)
: _num_rows_returned(0), _rows_returned_counter(nullptr), _parent(parent), _state(state) {}
: _num_rows_returned(0),
_rows_returned_counter(nullptr),
_parent(parent),
_state(state),
_budget(state->batch_size(), state->preferred_block_size_bytes()) {}

template <typename SharedStateArg>
Status PipelineXLocalState<SharedStateArg>::init(RuntimeState* state, LocalStateInfo& info) {
Expand Down Expand Up @@ -559,6 +560,12 @@ Status PipelineXLocalState<SharedStateArg>::init(RuntimeState* state, LocalState
_open_timer = ADD_TIMER_WITH_LEVEL(_common_profile, "OpenTime", 2);
_close_timer = ADD_TIMER_WITH_LEVEL(_common_profile, "CloseTime", 2);
_exec_timer = ADD_TIMER_WITH_LEVEL(_common_profile, "ExecTime", 1);
_output_block_bytes_counter =
ADD_COUNTER_WITH_LEVEL(_common_profile, "OutputBlockBytes", TUnit::BYTES, 1);
_max_output_block_bytes_counter =
ADD_COUNTER_WITH_LEVEL(_common_profile, "MaxOutputBlockBytes", TUnit::BYTES, 1);
_min_output_block_bytes_counter =
ADD_COUNTER_WITH_LEVEL(_common_profile, "MinOutputBlockBytes", TUnit::BYTES, 1);
_memory_used_counter =
_common_profile->AddHighWaterMarkCounter("MemoryUsage", TUnit::BYTES, "", 1);
_common_profile->add_info_string("IsColocate",
Expand Down
26 changes: 26 additions & 0 deletions be/src/exec/operator/operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <atomic>
#include <cstdint>
#include <functional>
#include <limits>
#include <memory>
#include <string>
#include <utility>
Expand All @@ -43,6 +44,7 @@
#include "runtime/runtime_profile.h"
#include "runtime/runtime_state.h"
#include "runtime/thread_context.h"
#include "util/block_budget.h"

namespace doris {
#include "common/compile_check_begin.h"
Expand Down Expand Up @@ -245,11 +247,28 @@ class PipelineXLocalStateBase {
RuntimeProfile::Counter* memory_used_counter() { return _memory_used_counter; }
OperatorXBase* parent() { return _parent; }
RuntimeState* state() { return _state; }
[[nodiscard]] const BlockBudget& block_budget() const { return _budget; }
VExprContextSPtrs& conjuncts() { return _conjuncts; }
VExprContextSPtrs& projections() { return _projections; }
[[nodiscard]] int64_t num_rows_returned() const { return _num_rows_returned; }
void add_num_rows_returned(int64_t delta) { _num_rows_returned += delta; }
void set_num_rows_returned(int64_t value) { _num_rows_returned = value; }
void update_output_block_counters(const Block& block) {
if (auto rows = block.rows()) {
COUNTER_UPDATE(_rows_returned_counter, rows);
COUNTER_UPDATE(_blocks_returned_counter, 1);
auto block_bytes = static_cast<int64_t>(block.bytes());
COUNTER_UPDATE(_output_block_bytes_counter, block_bytes);
if (block_bytes > _max_output_block_bytes) {
_max_output_block_bytes = block_bytes;
COUNTER_SET(_max_output_block_bytes_counter, block_bytes);
}
if (block_bytes < _min_output_block_bytes) {
_min_output_block_bytes = block_bytes;
COUNTER_SET(_min_output_block_bytes_counter, block_bytes);
}
}
}

[[nodiscard]] virtual std::string debug_string(int indentation_level = 0) const = 0;
[[nodiscard]] virtual bool is_blockable() const;
Expand Down Expand Up @@ -305,6 +324,11 @@ class PipelineXLocalStateBase {

RuntimeProfile::Counter* _rows_returned_counter = nullptr;
RuntimeProfile::Counter* _blocks_returned_counter = nullptr;
RuntimeProfile::Counter* _output_block_bytes_counter = nullptr;
RuntimeProfile::Counter* _max_output_block_bytes_counter = nullptr;
RuntimeProfile::Counter* _min_output_block_bytes_counter = nullptr;
int64_t _max_output_block_bytes = 0;
int64_t _min_output_block_bytes = std::numeric_limits<int64_t>::max();
RuntimeProfile::Counter* _wait_for_dependency_timer = nullptr;
// Account for current memory and peak memory used by this node
RuntimeProfile::HighWaterMarkCounter* _memory_used_counter = nullptr;
Expand All @@ -316,6 +340,8 @@ class PipelineXLocalStateBase {

OperatorXBase* _parent = nullptr;
RuntimeState* _state = nullptr;
// Execution-scoped row/byte budget derived from the session batch settings.
const BlockBudget _budget;
VExprContextSPtrs _conjuncts;
VExprContextSPtrs _projections;
std::shared_ptr<ScoreRuntime> _score_runtime;
Expand Down
6 changes: 1 addition & 5 deletions be/src/exec/operator/scan_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -344,11 +344,7 @@ class ScanOperatorX : public OperatorX<LocalStateType> {
Status get_block_after_projects(RuntimeState* state, Block* block, bool* eos) override {
Status status = get_block(state, block, eos);
if (status.ok()) {
if (auto rows = block->rows()) {
auto* local_state = state->get_local_state(operator_id());
COUNTER_UPDATE(local_state->_rows_returned_counter, rows);
COUNTER_UPDATE(local_state->_blocks_returned_counter, 1);
}
state->get_local_state(operator_id())->update_output_block_counters(*block);
}
return status;
}
Expand Down
8 changes: 7 additions & 1 deletion be/src/exec/operator/schema_scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ class RuntimeState;

namespace doris {

SchemaScanLocalState::SchemaScanLocalState(RuntimeState* state, OperatorXBase* parent)
: PipelineXLocalState<>(state, parent),
_data_dependency(std::make_shared<Dependency>(parent->operator_id(), parent->node_id(),
parent->get_name() + "_DEPENDENCY", true)) {
}

Status SchemaScanLocalState::init(RuntimeState* state, LocalStateInfo& info) {
RETURN_IF_ERROR(PipelineXLocalState<>::init(state, info));

Expand Down Expand Up @@ -243,7 +249,7 @@ Status SchemaScanOperatorX::get_block(RuntimeState* state, Block* block, bool* e
break;
}

if (src_block.rows() >= state->batch_size()) {
if (local_state.block_budget().exceeded(src_block.rows(), src_block.bytes())) {
break;
}
}
Expand Down
8 changes: 2 additions & 6 deletions be/src/exec/operator/schema_scan_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,7 @@ class SchemaScanLocalState final : public PipelineXLocalState<> {
public:
ENABLE_FACTORY_CREATOR(SchemaScanLocalState);

SchemaScanLocalState(RuntimeState* state, OperatorXBase* parent)
: PipelineXLocalState<>(state, parent) {
_data_dependency = std::make_shared<Dependency>(parent->operator_id(), parent->node_id(),
parent->get_name() + "_DEPENDENCY", true);
}
SchemaScanLocalState(RuntimeState* state, OperatorXBase* parent);
~SchemaScanLocalState() override = default;

Status init(RuntimeState* state, LocalStateInfo& info) override;
Expand Down Expand Up @@ -93,4 +89,4 @@ class SchemaScanOperatorX final : public OperatorX<SchemaScanLocalState> {
};

#include "common/compile_check_end.h"
} // namespace doris
} // namespace doris
Loading
Loading