Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dbms/src/Common/CurrentMetrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
M(StoragePoolUniPS) \
M(RegionPersisterRunMode) \
M(S3Requests) \
M(S3RandomAccessFile) \
M(GlobalStorageRunMode) \
M(GlobalThread) \
M(GlobalThreadActive) \
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Common/ProfileEvents.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,9 @@
M(S3GetObjectRetry) \
M(S3PutObjectRetry) \
M(S3IORead) \
M(S3IOReadError) \
M(S3IOSeek) \
M(S3IOSeekError) \
M(S3IOSeekBackward) \
M(FileCacheHit) \
M(FileCacheMiss) \
Expand Down
54 changes: 38 additions & 16 deletions dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,22 @@ static_assert(RAFT_REGION_BIG_WRITE_THRES * 4 < RAFT_REGION_BIG_WRITE_MAX, "Inva
F(type_uni_page_ids, {"type", "uni_page_ids"}), \
F(type_versioned_entries, {"type", "versioned_entries"})) \
M(tiflash_storage_read_tasks_count, "Total number of storage engine read tasks", Counter) \
M(tiflash_storage_place_index_count, \
"Total number of place index operations", \
Counter, \
F(type_reuse, {"type", "reuse"}), \
F(type_placed, {"type", "placed"}), \
F(type_placed_fully_indexed, {"type", "placed_fully_indexed"}), \
F(type_placed_fully_saved, {"type", "placed_fully_saved"})) \
M(tiflash_storage_place_index_stats_count, \
"Bucketed histogram of number of rows/deletes of index placement operations", \
Histogram, \
F(type_rows_newly_placed, {{"type", "rows_newly_placed"}}, ExpBuckets{1000, 2, 10}), \
F(type_deletes_newly_placed, {{"type", "deletes_newly_placed"}}, ExpBucketsWithRange{1, 2, 100}), \
F(type_rows_after_placed, {{"type", "rows_after_placed"}}, ExpBuckets{1000, 2, 10}), \
F(type_deletes_after_placed, {{"type", "deletes_after_placed"}}, ExpBucketsWithRange{1, 2, 100}), \
F(type_rows_reuse_placed, {{"type", "rows_reuse_placed"}}, ExpBuckets{1000, 2, 10}), \
F(type_deletes_reuse_placed, {{"type", "deletes_reuse_placed"}}, ExpBucketsWithRange{1, 2, 100})) \
M(tiflash_storage_command_count, \
"Total number of storage's command, such as delete range / shutdown /startup", \
Counter, \
Expand All @@ -246,18 +262,18 @@ static_assert(RAFT_REGION_BIG_WRITE_THRES * 4 < RAFT_REGION_BIG_WRITE_MAX, "Inva
F(type_place_index_update, {"type", "place_index_update"})) \
M(tiflash_storage_subtask_duration_seconds, \
"Bucketed histogram of storage's sub task duration", \
Histogram, \
F(type_delta_merge_bg, {{"type", "delta_merge_bg"}}, ExpBuckets{0.001, 2, 20}), \
F(type_delta_merge_bg_gc, {{"type", "delta_merge_bg_gc"}}, ExpBuckets{0.001, 2, 20}), \
F(type_delta_merge_fg, {{"type", "delta_merge_fg"}}, ExpBuckets{0.001, 2, 20}), \
F(type_delta_merge_manual, {{"type", "delta_merge_manual"}}, ExpBuckets{0.001, 2, 20}), \
F(type_delta_compact, {{"type", "delta_compact"}}, ExpBuckets{0.001, 2, 20}), \
F(type_delta_flush, {{"type", "delta_flush"}}, ExpBuckets{0.001, 2, 20}), \
F(type_seg_split_bg, {{"type", "seg_split_bg"}}, ExpBuckets{0.001, 2, 20}), \
F(type_seg_split_fg, {{"type", "seg_split_fg"}}, ExpBuckets{0.001, 2, 20}), \
F(type_seg_split_ingest, {{"type", "seg_split_ingest"}}, ExpBuckets{0.001, 2, 20}), \
F(type_seg_merge_bg_gc, {{"type", "seg_merge_bg_gc"}}, ExpBuckets{0.001, 2, 20}), \
F(type_place_index_update, {{"type", "place_index_update"}}, ExpBuckets{0.001, 2, 20})) \
Histogram, /* increase the bucket from 10ms to 87 minutes */ \
F(type_delta_merge_bg, {{"type", "delta_merge_bg"}}, ExpBuckets{0.010, 2, 20}), \
F(type_delta_merge_bg_gc, {{"type", "delta_merge_bg_gc"}}, ExpBuckets{0.010, 2, 20}), \
F(type_delta_merge_fg, {{"type", "delta_merge_fg"}}, ExpBuckets{0.010, 2, 20}), \
F(type_delta_merge_manual, {{"type", "delta_merge_manual"}}, ExpBuckets{0.010, 2, 20}), \
F(type_delta_compact, {{"type", "delta_compact"}}, ExpBuckets{0.010, 2, 20}), \
F(type_delta_flush, {{"type", "delta_flush"}}, ExpBuckets{0.010, 2, 20}), \
F(type_seg_split_bg, {{"type", "seg_split_bg"}}, ExpBuckets{0.010, 2, 20}), \
F(type_seg_split_fg, {{"type", "seg_split_fg"}}, ExpBuckets{0.010, 2, 20}), \
F(type_seg_split_ingest, {{"type", "seg_split_ingest"}}, ExpBuckets{0.010, 2, 20}), \
F(type_seg_merge_bg_gc, {{"type", "seg_merge_bg_gc"}}, ExpBuckets{0.010, 2, 20}), \
F(type_place_index_update, {{"type", "place_index_update"}}, ExpBuckets{0.010, 2, 20})) \
M(tiflash_storage_subtask_throughput_bytes, \
"Calculate the throughput of (maybe foreground) tasks of storage in bytes", \
Counter, /**/ \
Expand Down Expand Up @@ -677,12 +693,17 @@ static_assert(RAFT_REGION_BIG_WRITE_THRES * 4 < RAFT_REGION_BIG_WRITE_MAX, "Inva
M(tiflash_storage_read_thread_gauge, \
"The gauge of storage read thread", \
Gauge, \
F(type_read_task_pool, {"type", "read_task_pool"}), \
F(type_read_task, {"type", "read_task"}), \
F(type_read_task_active, {"type", "read_task_active"}), \
F(type_merged_task, {"type", "merged_task"}), \
F(type_merged_task_active, {"type", "merged_task_active"})) \
F(type_merged_task_units, /* num of merged task segments */ {"type", "merged_task_units"}), \
F(type_merged_task_active, /* num of merged task actively reading by SegmentReader */ \
{"type", "merged_task_active"})) \
M(tiflash_storage_read_thread_seconds, \
"Bucketed histogram of read thread", \
Histogram, \
F(type_merged_task, {{"type", "merged_task"}}, ExpBuckets{0.001, 2, 20})) \
Histogram, /* increase the bucket from 10ms to 87 minutes */ \
F(type_merged_task, {{"type", "merged_task"}}, ExpBuckets{0.010, 2, 20})) \
M(tiflash_mpp_task_manager, \
"The gauge of mpp task manager", \
Gauge, \
Expand Down Expand Up @@ -767,7 +788,8 @@ static_assert(RAFT_REGION_BIG_WRITE_THRES * 4 < RAFT_REGION_BIG_WRITE_MAX, "Inva
F(type_list_objects, {{"type", "list_objects"}}, ExpBuckets{0.001, 2, 20}), \
F(type_delete_object, {{"type", "delete_object"}}, ExpBuckets{0.001, 2, 20}), \
F(type_head_object, {{"type", "head_object"}}, ExpBuckets{0.001, 2, 20}), \
F(type_read_stream, {{"type", "read_stream"}}, ExpBuckets{0.0001, 2, 20})) \
F(type_read_stream, {{"type", "read_stream"}}, ExpBuckets{0.0001, 2, 20}), \
F(type_read_stream_err, {{"type", "read_stream_err"}}, ExpBuckets{0.0001, 2, 20})) \
M(tiflash_storage_s3_http_request_seconds, \
"S3 request duration breakdown in seconds", \
Histogram, \
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Storages/DeltaMerge/File/ColumnStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include <Storages/DeltaMerge/File/ColumnStream.h>
#include <Storages/DeltaMerge/File/DMFileReader.h>
#include <Storages/Page/PageUtil.h>
#include <Storages/S3/S3RandomAccessFile.h>

namespace DB::DM
{
Expand Down
1 change: 0 additions & 1 deletion dbms/src/Storages/DeltaMerge/File/DMFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
#include <Storages/DeltaMerge/File/DMFile_fwd.h>
#include <Storages/FormatVersion.h>
#include <Storages/S3/S3Filename.h>
#include <Storages/S3/S3RandomAccessFile.h>
#include <TiDB/Schema/TiDB.h>
#include <common/logger_useful.h>

Expand Down
1 change: 1 addition & 0 deletions dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <Storages/DeltaMerge/Index/RSResult.h>
#include <Storages/DeltaMerge/RowKeyRange.h>
#include <Storages/DeltaMerge/ScanContext.h>
#include <Storages/S3/S3RandomAccessFile.h>

namespace DB::DM
{
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Storages/DeltaMerge/ReadThread/MergedTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,12 @@ class MergedTask
{
passive_merged_segments.fetch_add(units.size() - 1, std::memory_order_relaxed);
GET_METRIC(tiflash_storage_read_thread_gauge, type_merged_task).Increment();
GET_METRIC(tiflash_storage_read_thread_gauge, type_merged_task_units).Increment(units.size());
}
~MergedTask()
{
passive_merged_segments.fetch_sub(units.size() - 1, std::memory_order_relaxed);
GET_METRIC(tiflash_storage_read_thread_gauge, type_merged_task_units).Decrement(units.size());
GET_METRIC(tiflash_storage_read_thread_gauge, type_merged_task).Decrement();
GET_METRIC(tiflash_storage_read_thread_seconds, type_merged_task).Observe(sw.elapsedSeconds());
}
Expand Down
24 changes: 23 additions & 1 deletion dbms/src/Storages/DeltaMerge/Segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2666,6 +2666,7 @@ Segment::ReadInfo Segment::getReadInfo(
bool ok = segment_snap->delta->getSharedDeltaIndex()->updateIfAdvanced(*my_delta_index);
if (ok)
{
GET_METRIC(tiflash_storage_place_index_count, type_placed_fully_saved).Increment();
LOG_DEBUG(
segment_snap->log,
"Segment updated delta index, my_delta_index={} {}",
Expand Down Expand Up @@ -2796,6 +2797,9 @@ std::pair<DeltaIndexPtr, bool> Segment::ensurePlace(
start_ts))
{
// We can reuse the shared-delta-index
GET_METRIC(tiflash_storage_place_index_count, type_reuse).Increment();
GET_METRIC(tiflash_storage_place_index_stats_count, type_rows_reuse_placed).Observe(my_placed_rows);
GET_METRIC(tiflash_storage_place_index_stats_count, type_deletes_reuse_placed).Observe(my_placed_deletes);
return {my_delta_index, false};
}

Expand All @@ -2815,6 +2819,8 @@ std::pair<DeltaIndexPtr, bool> Segment::ensurePlace(
delta_snap->getDeletes());

bool fully_indexed = true;
size_t new_placed_rows = 0;
size_t new_placed_deletes = 0;
for (auto & v : items)
{
if (v.isBlock())
Expand Down Expand Up @@ -2851,6 +2857,7 @@ std::pair<DeltaIndexPtr, bool> Segment::ensurePlace(
relevant_place);

my_placed_rows += rows;
new_placed_rows += rows;
}
else
{
Expand All @@ -2874,6 +2881,7 @@ std::pair<DeltaIndexPtr, bool> Segment::ensurePlace(
relevant_place);

++my_placed_deletes;
++new_placed_deletes;
}
}

Expand All @@ -2887,11 +2895,25 @@ std::pair<DeltaIndexPtr, bool> Segment::ensurePlace(

my_delta_index->update(my_delta_tree, my_placed_rows, my_placed_deletes);

GET_METRIC(tiflash_storage_place_index_count, type_placed).Increment();
GET_METRIC(tiflash_storage_place_index_stats_count, type_rows_newly_placed).Observe(new_placed_rows);
GET_METRIC(tiflash_storage_place_index_stats_count, type_deletes_newly_placed).Observe(new_placed_deletes);
GET_METRIC(tiflash_storage_place_index_stats_count, type_rows_after_placed).Observe(my_placed_rows);
GET_METRIC(tiflash_storage_place_index_stats_count, type_deletes_after_placed).Observe(my_placed_deletes);
if (fully_indexed)
GET_METRIC(tiflash_storage_place_index_count, type_placed_fully_indexed).Increment();
LOG_DEBUG(
segment_snap->log,
"Finish segment ensurePlace, read_ranges={} placed_items={} shared_delta_index={} my_delta_index={} {}",
"Finish segment ensurePlace, read_ranges={} placed_items={} "
"new_placed_rows={} new_placed_deletes={} my_placed_rows={} my_placed_deletes={} fully_indexed={} "
"shared_delta_index={} my_delta_index={} {}",
read_ranges,
items.size(),
new_placed_rows,
new_placed_deletes,
my_placed_rows,
my_placed_deletes,
fully_indexed,
delta_snap->getSharedDeltaIndex()->toString(),
my_delta_index->toString(),
simpleInfo());
Expand Down
9 changes: 6 additions & 3 deletions dbms/src/Storages/DeltaMerge/SegmentReadTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ SegmentReadTask::SegmentReadTask(
, dm_context(dm_context_)
, ranges(ranges_)
{
CurrentMetrics::add(CurrentMetrics::DT_SegmentReadTasks);
CurrentMetrics::add(CurrentMetrics::DT_SegmentReadTasks); // keep for compatibility.
GET_METRIC(tiflash_storage_read_thread_gauge, type_read_task).Increment();
}

SegmentReadTask::SegmentReadTask(
Expand All @@ -74,7 +75,8 @@ SegmentReadTask::SegmentReadTask(
size_t establish_disagg_task_resp_size)
: store_id(store_id_)
{
CurrentMetrics::add(CurrentMetrics::DT_SegmentReadTasks);
CurrentMetrics::add(CurrentMetrics::DT_SegmentReadTasks); // keep for compatibility.
GET_METRIC(tiflash_storage_read_thread_gauge, type_read_task).Increment();
auto tracing_id = fmt::format(
"{} segment_id={} epoch={} delta_epoch={}",
log->identifier(),
Expand Down Expand Up @@ -181,7 +183,8 @@ SegmentReadTask::SegmentReadTask(

SegmentReadTask::~SegmentReadTask()
{
CurrentMetrics::sub(CurrentMetrics::DT_SegmentReadTasks);
CurrentMetrics::sub(CurrentMetrics::DT_SegmentReadTasks); // keep for compatibility.
GET_METRIC(tiflash_storage_read_thread_gauge, type_read_task).Decrement();
}

void SegmentReadTask::addRange(const RowKeyRange & range)
Expand Down
15 changes: 14 additions & 1 deletion dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ namespace DB::FailPoints
extern const char pause_when_reading_from_dt_stream[];
} // namespace DB::FailPoints

namespace CurrentMetrics
{
extern const Metric DT_SegmentReadTasks;
} // namespace CurrentMetrics
namespace DB::DM
{
SegmentReadTasksWrapper::SegmentReadTasksWrapper(bool enable_read_thread_, SegmentReadTasks && ordered_tasks_)
Expand Down Expand Up @@ -151,6 +155,7 @@ SegmentReadTaskPool::SegmentReadTaskPool(
, keyspace_id(keyspace_id_)
, res_group_name(res_group_name_)
{
GET_METRIC(tiflash_storage_read_thread_gauge, type_read_task_pool).Increment();
if (tasks_wrapper.empty())
{
q.finish();
Expand All @@ -159,6 +164,7 @@ SegmentReadTaskPool::SegmentReadTaskPool(

SegmentReadTaskPool::~SegmentReadTaskPool()
{
GET_METRIC(tiflash_storage_read_thread_gauge, type_read_task_pool).Decrement();
auto [pop_times, pop_empty_times, peak_blocks_in_queue] = q.getStat();
auto pop_empty_ratio = pop_times > 0 ? pop_empty_times * 1.0 / pop_times : 0.0;
auto total_count = blk_stat.totalCount();
Expand Down Expand Up @@ -198,7 +204,8 @@ void SegmentReadTaskPool::finishSegment(const SegmentReadTaskPtr & seg)
active_segment_ids.erase(seg->getGlobalSegmentID());
pool_finished = active_segment_ids.empty() && tasks_wrapper.empty();
}
LOG_DEBUG(log, "finishSegment pool_id={} segment={} pool_finished={}", pool_id, seg, pool_finished);
GET_METRIC(tiflash_storage_read_thread_gauge, type_read_task_active).Decrement();
LOG_INFO(log, "finishSegment pool_id={} segment={} pool_finished={}", pool_id, seg, pool_finished);
if (pool_finished)
{
q.finish();
Expand All @@ -217,8 +224,14 @@ SegmentReadTaskPtr SegmentReadTaskPool::getTask(const GlobalSegmentID & seg_id)
std::lock_guard lock(mutex);
auto t = tasks_wrapper.getTask(seg_id);
RUNTIME_CHECK(t != nullptr, pool_id, seg_id);
auto no_task_left = tasks_wrapper.empty();
active_segment_ids.insert(seg_id);
GET_METRIC(tiflash_storage_read_thread_gauge, type_read_task_active).Increment();
peak_active_segments = std::max(peak_active_segments, active_segment_ids.size());
if (no_task_left)
{
LOG_INFO(log, "pool_id={} all tasks scheduled, active_segment_size={}", pool_id, active_segment_ids.size());
}
return t;
}

Expand Down
Loading