From 11a0c9dddbdd81843796637ad68d8c68c2e693c6 Mon Sep 17 00:00:00 2001 From: Daniil Porokhnin Date: Thu, 28 May 2026 14:34:12 +0300 Subject: [PATCH] feat: add config and metrics for compaction chore: remove TODO and FIXME --- cmd/seq-db/seq-db.go | 13 +++++ compaction/executor.go | 27 ++++++--- compaction/metrics.go | 52 +++++++++++++++++ compaction/planner.go | 50 ++++++++++------ config.example.yaml | 13 ++++- config/config.go | 14 +++++ config/validation.go | 23 ++++++++ config/validation_test.go | 119 -------------------------------------- storeapi/store.go | 7 ++- tests/setup/env.go | 13 +++++ 10 files changed, 183 insertions(+), 148 deletions(-) delete mode 100644 config/validation_test.go diff --git a/cmd/seq-db/seq-db.go b/cmd/seq-db/seq-db.go index b0a7c47e..03a9d336 100644 --- a/cmd/seq-db/seq-db.go +++ b/cmd/seq-db/seq-db.go @@ -19,6 +19,7 @@ import ( "github.com/ozontech/seq-db/asyncsearcher" "github.com/ozontech/seq-db/buildinfo" + "github.com/ozontech/seq-db/compaction" "github.com/ozontech/seq-db/config" "github.com/ozontech/seq-db/consts" "github.com/ozontech/seq-db/frac" @@ -323,6 +324,18 @@ func startStore( Workers: cfg.SkipMaskManager.Workers, CacheSizeLimit: uint64(cfg.SkipMaskManager.CacheSize), }, + Compaction: compaction.Config{ + MergeTrigger: cfg.Compaction.STCS.MergeTrigger, + MergeFanIn: cfg.Compaction.STCS.MergeFanIn, + MergeFanOutSize: uint64(cfg.Compaction.STCS.MergeFanOutSize), + + BucketLowerbound: cfg.Compaction.STCS.BucketLowerbound, + BucketUpperbound: cfg.Compaction.STCS.BucketUpperbound, + + Workers: cfg.Compaction.Workers, + TimeWindow: cfg.Compaction.TimeWindow, + TickInterval: cfg.Compaction.TickInterval, + }, } s3cli := initS3Client(cfg) diff --git a/compaction/executor.go b/compaction/executor.go index 13fcbeab..1d6d500b 100644 --- a/compaction/executor.go +++ b/compaction/executor.go @@ -2,6 +2,7 @@ package compaction import ( "sync" + "time" "go.uber.org/zap" @@ -12,20 +13,22 @@ import ( ) type Executor struct { + params common.SealParams + workers int wg sync.WaitGroup - p *planner + + p *planner } -// FIXME(dkharms): I need to pass here [common.SealParams]. -func NewExecutor(workers int, p *planner) *Executor { - e := Executor{workers: workers, p: p} +func NewExecutor(workers int, params common.SealParams, p *planner) *Executor { + e := Executor{workers: workers, p: p, params: params} e.init() return &e } -func (e *Executor) Close() { - e.p.close() +func (e *Executor) Stop() { + e.p.stop() e.wg.Wait() } @@ -33,7 +36,14 @@ func (e *Executor) init() { for range e.workers { e.wg.Go(func() { for t := range e.p.tasks { - t.onComplete(e.compact(t)) + compactionInflight.Inc() + + start := time.Now() + result, err := e.compact(t) + compactionDurationSeconds.Observe(time.Since(start).Seconds()) + + t.onComplete(result, err) + compactionInflight.Dec() } }) } @@ -48,6 +58,7 @@ func (e *Executor) compact(t task) (*sealed.PreloadedData, error) { for _, f := range t.snapshot.Fractions() { names = append(names, f.Info().Name()) srcs = append(srcs, frac.NewSealedSource(f)) + compactionBytesTotal.Add(float64(f.Info().IndexOnDisk)) } logger.Info( @@ -56,6 +67,6 @@ func (e *Executor) compact(t task) (*sealed.PreloadedData, error) { zap.Strings("names", names), ) - preloaded, err := Merge(t.filename, common.SealParams{}, srcs...) + preloaded, err := Merge(t.filename, e.params, srcs...) return preloaded, err } diff --git a/compaction/metrics.go b/compaction/metrics.go index d1d3cde1..eb0b9656 100644 --- a/compaction/metrics.go +++ b/compaction/metrics.go @@ -1 +1,53 @@ package compaction + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + + "github.com/ozontech/seq-db/metric" +) + +var ( + compactionInflight = promauto.NewGauge(prometheus.GaugeOpts{ + Namespace: "seq_db_store", + Subsystem: "compaction", + Name: "inflight", + Help: "Number of running compactions", + }) + + compactionSkipped = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: "seq_db_store", + Subsystem: "compaction", + Name: "skipped_total", + Help: "Tick-triggered tasks dropped because all workers were busy or no candidates were found", + }) + + compactionBinsTotal = promauto.NewGauge(prometheus.GaugeOpts{ + Namespace: "seq_db_store", + Subsystem: "compaction", + Name: "bins_total", + Help: "Number of active time-bins considered for compaction", + }) + + compactionDurationSeconds = promauto.NewHistogram(prometheus.HistogramOpts{ + Namespace: "seq_db_store", + Subsystem: "compaction", + Name: "duration_seconds", + Help: "Time spent executing a single compaction", + Buckets: metric.SecondsBuckets, + }) + + compactionBytesTotal = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: "seq_db_store", + Subsystem: "compaction", + Name: "bytes_total", + Help: "Total index bytes merged across all compactions", + }) + + compactionResultTotal = promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: "seq_db_store", + Subsystem: "compaction", + Name: "result_total", + Help: "Compaction outcomes by result (success, empty, error)", + }, []string{"result"}) +) diff --git a/compaction/planner.go b/compaction/planner.go index 487890b6..b7d0b735 100644 --- a/compaction/planner.go +++ b/compaction/planner.go @@ -9,23 +9,29 @@ import ( "go.uber.org/zap" - "github.com/alecthomas/units" "github.com/ozontech/seq-db/frac/common" "github.com/ozontech/seq-db/frac/sealed" "github.com/ozontech/seq-db/fracmanager" "github.com/ozontech/seq-db/logger" ) +type Config struct { + MergeTrigger int + MergeFanIn int + MergeFanOutSize uint64 + + BucketLowerbound float64 + BucketUpperbound float64 + + Workers int + TimeWindow time.Duration + TickInterval time.Duration +} + type fraction interface { Info() *common.Info } -const ( - // TODO(dkharms): Move this options to config. - compactionTick = time.Second - compactionWindow = 24 * time.Hour -) - type task struct { bin time.Time filename string @@ -34,6 +40,8 @@ type task struct { } type planner struct { + cfg Config + wg sync.WaitGroup ctx context.Context done chan struct{} @@ -49,8 +57,10 @@ type planner struct { stats map[time.Time]int } -func NewPlanner(ctx context.Context, fm *fracmanager.FracManager) *planner { +func NewPlanner(ctx context.Context, fm *fracmanager.FracManager, cfg Config) *planner { p := planner{ + cfg: cfg, + ctx: ctx, done: make(chan struct{}), @@ -68,7 +78,7 @@ func NewPlanner(ctx context.Context, fm *fracmanager.FracManager) *planner { func (p *planner) init() { p.wg.Go(func() { - t := time.NewTicker(compactionTick) + t := time.NewTicker(p.cfg.TickInterval) for { select { @@ -83,6 +93,7 @@ func (p *planner) init() { case <-t.C: task, ok := p.pick() if !ok { + compactionSkipped.Inc() continue } @@ -91,13 +102,14 @@ func (p *planner) init() { case <-time.NewTimer(time.Second).C: // If all executor workers are busy for some long period of time, // we want to drop the task because it might contain stale decision. + compactionSkipped.Inc() } } } }) } -func (p *planner) close() { +func (p *planner) stop() { close(p.done) } @@ -109,7 +121,8 @@ func (p *planner) pick() (task, bool) { snapshot[i] = fractions[i] } - bins := p.distribute(compactionWindow, snapshot) + bins := p.distribute(p.cfg.TimeWindow, snapshot) + compactionBinsTotal.Set(float64(len(bins))) times := p.prioritize(bins) p.mu.Lock() @@ -123,13 +136,12 @@ func (p *planner) pick() (task, bool) { continue } - // TODO(dkharms): Move this options to config. picked := strategySTCS{ - mergeTrigger: 4, - mergeFanIn: 32, - mergeFanOutSize: 128 * uint64(units.MiB), - bucketLowerbound: 0.5, - bucketUpperbound: 1.5, + mergeTrigger: p.cfg.MergeTrigger, + mergeFanIn: p.cfg.MergeFanIn, + mergeFanOutSize: p.cfg.MergeFanOutSize, + bucketLowerbound: p.cfg.BucketLowerbound, + bucketUpperbound: p.cfg.BucketUpperbound, }.Pick(bins[t].fracs) if len(picked) == 0 { @@ -156,11 +168,14 @@ func (p *planner) pick() (task, bool) { delete(p.inflight, t) if err != nil { + compactionResultTotal.WithLabelValues("error").Inc() + logger.Error( "failed to compact fractions", zap.Error(err), zap.Any("snapshot", names(csnapshot.Fractions())), ) + return } @@ -172,6 +187,7 @@ func (p *planner) pick() (task, bool) { return } + compactionResultTotal.WithLabelValues("success").Inc() // TODO(dkharms): Is it fine to substitute and delete? // We need somehow substitute and delete atomically. p.fm.SubstituteWithSealed(s, csnapshot) diff --git a/config.example.yaml b/config.example.yaml index c5bcb24b..9658fa8f 100644 --- a/config.example.yaml +++ b/config.example.yaml @@ -10,13 +10,24 @@ storage: frac_size: 16MiB total_size: 1GiB +compaction: + workers: 4 + time_window: 24h + tick_interval: 1s + stcs: + merge_trigger: 4 + merge_fan_in: 32 + merge_fan_out_size: 512MiB + bucket_lowerbound: 0.5 + bucket_upperbound: 1.5 + # For testing or developments purposes you can run MinIO S3 compatible object storage locally. # # docker run -p 9000:9000 -p 9001:9001 \ # quay.io/minio/minio server /data --console-address ":9001" offloading: - enabled: true + enabled: false retention: 5m endpoint: http://localhost:9000/ bucket: remote-storage diff --git a/config/config.go b/config/config.go index 28a67dc0..3cc5a792 100644 --- a/config/config.go +++ b/config/config.go @@ -35,6 +35,7 @@ func Parse(path string) (Config, error) { } /* Set computed defaults if user did not override them */ + c.Compaction.Workers = cmp.Or(c.Compaction.Workers, NumCPU) c.Resources.ReaderWorkers = cmp.Or(c.Resources.ReaderWorkers, NumCPU) c.Resources.SearchWorkers = cmp.Or(c.Resources.SearchWorkers, NumCPU) @@ -202,6 +203,19 @@ type Config struct { DocBlockZstdCompressionLevel int `config:"doc_block_zstd_compression_level" default:"3"` } `config:"compression"` + Compaction struct { + STCS struct { + MergeTrigger int `config:"merge_trigger" default:"4"` + MergeFanIn int `config:"merge_fan_in" default:"32"` + MergeFanOutSize Bytes `config:"merge_fan_out_size" default:"512MiB"` + BucketLowerbound float64 `config:"bucket_lowerbound" default:"0.5"` + BucketUpperbound float64 `config:"bucket_upperbound" default:"1.5"` + } `config:"stcs"` + Workers int `config:"workers"` + TimeWindow time.Duration `config:"time_window" default:"24h"` + TickInterval time.Duration `config:"tick_interval" default:"1s"` + } `config:"compaction"` + Indexing struct { MaxTokenSize int `config:"max_token_size" default:"72"` CaseSensitive bool `config:"case_sensitive"` diff --git a/config/validation.go b/config/validation.go index 15d63c9b..9c8f730a 100644 --- a/config/validation.go +++ b/config/validation.go @@ -71,6 +71,17 @@ func (c *Config) storeValidations() []validateFn { inRange("offloading.queue_size_percent", 0, 100, c.Offloading.QueueSizePercent), greaterThan("experimental.max_regex_tokens_check", -1, c.Experimental.MaxRegexTokensCheck), + + greaterThan("compaction.stcs.merge_trigger", 0, c.Compaction.STCS.MergeTrigger), + greaterThan("compaction.stcs.merge_fan_out_size", 0, c.Compaction.STCS.MergeFanOutSize), + greaterOrEqualThan("compaction.stcs.merge_fan_in", c.Compaction.STCS.MergeTrigger, c.Compaction.STCS.MergeFanIn), + + greaterThan("compaction.stcs.bucket_lowerbound", 0, c.Compaction.STCS.BucketLowerbound), + greaterOrEqualThan("compaction.stcs.bucket_upperbound", c.Compaction.STCS.BucketLowerbound, c.Compaction.STCS.BucketUpperbound), + + greaterOrEqualThan("compaction.workers", 0, c.Compaction.Workers), + greaterThan("compaction.time_window", 0, c.Compaction.TimeWindow), + greaterThan("compaction.tick_interval", 0, c.Compaction.TickInterval), } if c.Offloading.Enabled { @@ -106,6 +117,18 @@ func greaterThan[T cmp.Ordered](field string, base, v T) validateFn { } } +func greaterOrEqualThan[T cmp.Ordered](field string, base, v T) validateFn { + return func() error { + if v < base { + return fmt.Errorf( + "field %q must be greater or equal than %v", + field, base, + ) + } + return nil + } +} + func inRange[T cmp.Ordered](field string, from, to, v T) validateFn { return func() error { if v < from || to < v { diff --git a/config/validation_test.go b/config/validation_test.go deleted file mode 100644 index 0a29f990..00000000 --- a/config/validation_test.go +++ /dev/null @@ -1,119 +0,0 @@ -package config - -import ( - "os" - "path" - "path/filepath" - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestValidation(t *testing.T) { - base := `storage: - data_dir: /seq-db-data - frac_size: 16MiB - total_size: 10GiB - -mapping: - path: /configs/mapping.yaml - -resources: - cache_size: 2GiB - -limits: - query_rate: 1024 - search_requests: 1024 - bulk_requests: 128 - inflight_bulks: 128 - doc_size: 1MiB -` - - baseCfg := createCfgFile(t, base) - - tests := []struct { - name string - cfg string - env map[string]string - expectErr bool - }{ - { - name: "Invalid storage.sealing_queue_len 1", - cfg: baseCfg, - env: map[string]string{"SEQDB_STORAGE_SEALING_QUEUE_LEN": "-1"}, - expectErr: true, - }, - { - name: "Valid storage.sealing_queue_len 2", - cfg: baseCfg, - env: map[string]string{"SEQDB_STORAGE_SEALING_QUEUE_LEN": "0"}, - expectErr: false, - }, - { - name: "Valid storage.sealing_queue_len 3", - cfg: baseCfg, - env: map[string]string{"SEQDB_STORAGE_SEALING_QUEUE_LEN": "100"}, - expectErr: false, - }, - - { - name: "Invalid offloading.queue_size_percent 1", - cfg: baseCfg, - env: map[string]string{"SEQDB_OFFLOADING_QUEUE_SIZE_PERCENT": "-1"}, - expectErr: true, - }, - { - name: "Invalid offloading.queue_size_percent 2", - cfg: baseCfg, - env: map[string]string{"SEQDB_OFFLOADING_QUEUE_SIZE_PERCENT": "100.1"}, - expectErr: true, - }, - { - name: "Valid offloading.queue_size_percent 3", - cfg: baseCfg, - env: map[string]string{"SEQDB_OFFLOADING_QUEUE_SIZE_PERCENT": "0"}, - expectErr: false, - }, - { - name: "Valid offloading.queue_size_percent 4", - cfg: baseCfg, - env: map[string]string{"SEQDB_OFFLOADING_QUEUE_SIZE_PERCENT": "100"}, - expectErr: false, - }, - { - name: "Valid offloading.queue_size_percent 5", - cfg: baseCfg, - env: map[string]string{"SEQDB_OFFLOADING_QUEUE_SIZE_PERCENT": "50"}, - expectErr: false, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - for k, v := range tt.env { - t.Setenv(k, v) - } - - c, err := Parse(tt.cfg) - assert.NoError(t, err) - - res := c.Validate("store") - if tt.expectErr { - assert.Error(t, res) - } else { - assert.NoError(t, res) - } - }) - } - -} - -func createCfgFile(t *testing.T, data string) string { - f := path.Join(t.TempDir(), "config.yaml") - err := os.WriteFile(f, []byte(data), 0o666) - assert.NoError(t, err) - - abs, err := filepath.Abs(f) - assert.NoError(t, err) - return abs -} diff --git a/storeapi/store.go b/storeapi/store.go index c3ee0aca..2d1d7fa9 100644 --- a/storeapi/store.go +++ b/storeapi/store.go @@ -41,6 +41,7 @@ type StoreConfig struct { API APIConfig FracManager fracmanager.Config SkipMaskManagerConfig skipmaskmanager.Config + Compaction compaction.Config } func (c *StoreConfig) setDefaults() error { @@ -73,8 +74,8 @@ func NewStore( return nil, fmt.Errorf("loading fractions error: %w", err) } - planner := compaction.NewPlanner(ctx, fracManager) - executor := compaction.NewExecutor(10, planner) + planner := compaction.NewPlanner(ctx, fracManager, c.Compaction) + executor := compaction.NewExecutor(c.Compaction.Workers, c.FracManager.SealParams, planner) skipMaskManager.Start(fracManager) @@ -112,7 +113,7 @@ func (s *Store) Stop() { s.grpcServer.Stop(ctx) s.fracManagerStop() s.SkipMaskManager.Stop() - s.Executor.Close() + s.Executor.Stop() logger.Info("store stopped") } diff --git a/tests/setup/env.go b/tests/setup/env.go index a42ba006..703e9936 100644 --- a/tests/setup/env.go +++ b/tests/setup/env.go @@ -21,6 +21,7 @@ import ( "github.com/google/uuid" "github.com/ozontech/seq-db/buildinfo" + "github.com/ozontech/seq-db/compaction" "github.com/ozontech/seq-db/consts" "github.com/ozontech/seq-db/frac/common" "github.com/ozontech/seq-db/fracmanager" @@ -127,6 +128,18 @@ func (cfg *TestingEnvConfig) GetStoreConfig(replicaID string, cold bool) storeap SkipMaskManagerConfig: skipmaskmanager.Config{ DataDir: filepath.Join(cfg.DataDir, replicaID, "skipmasks"), }, + Compaction: compaction.Config{ + MergeTrigger: 4, + MergeFanIn: 32, + MergeFanOutSize: math.MaxUint64, + + BucketLowerbound: 0.5, + BucketUpperbound: 1.5, + + Workers: 4, + TimeWindow: time.Hour * 24, + TickInterval: time.Second, + }, } }