diff --git a/go.mod b/go.mod index cf602914109..7836ac39b9f 100644 --- a/go.mod +++ b/go.mod @@ -152,7 +152,7 @@ require ( github.com/pion/webrtc/v4 v4.1.2 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect - github.com/prometheus/client_model v0.6.2 // indirect + github.com/prometheus/client_model v0.6.2 github.com/prometheus/common v0.64.0 github.com/prometheus/procfs v0.16.1 // indirect github.com/prometheus/statsd_exporter v0.26.1 // indirect diff --git a/pkg/storer/internal_metrics_test.go b/pkg/storer/internal_metrics_test.go new file mode 100644 index 00000000000..248d91d22fc --- /dev/null +++ b/pkg/storer/internal_metrics_test.go @@ -0,0 +1,90 @@ +// Copyright 2023 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package storer + +import ( + "context" + "testing" + "time" + + batchstore "github.com/ethersphere/bee/v2/pkg/postage/batchstore/mock" + stabilmock "github.com/ethersphere/bee/v2/pkg/stabilization/mock" + "github.com/ethersphere/bee/v2/pkg/swarm" + kademlia "github.com/ethersphere/bee/v2/pkg/topology/mock" + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" +) + +func toFloat64(c prometheus.Collector) float64 { + var ( + m = make(chan prometheus.Metric, 1) // Buffered channel to avoid blocking if only 1 metric + + ) + + // Collect the metric + c.Collect(m) + close(m) // Close immediately after collect + + metric := <-m + if metric == nil { + return 0 + } + + pb := &dto.Metric{} + if err := metric.Write(pb); err != nil { + return 0 + } + + if pb.Gauge != nil { + return pb.Gauge.GetValue() + } + if pb.Counter != nil { + return pb.Counter.GetValue() + } + // Add other types if necessary + return 0 +} + +func TestLevelDBMetrics(t *testing.T) { + // Options with LdbStats initialized to capture metrics + opts := DefaultOptions() + opts.Address = swarm.RandAddress(t) + opts.RadiusSetter = kademlia.NewTopologyDriver() + opts.Batchstore = batchstore.New() + opts.ReserveWakeUpDuration = time.Second + opts.StartupStabilizer = stabilmock.NewSubscriber(true) + opts.LdbStats.Store(prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Name: "test_ldb_stats", + Help: "Test LevelDB stats", + }, []string{"counter"})) + + // Create a real disk storer to trigger initDiskRepository + dir := t.TempDir() + st, err := New(context.Background(), dir, opts) + if err != nil { + t.Fatalf("New(...) unexpected error: %v", err) + } + defer st.Close() + + // Verify Configuration Metrics + if v := toFloat64(st.metrics.LevelDBConfigCompactionL0Trigger); v != 16 { + t.Errorf("LevelDBConfigCompactionL0Trigger = %v, want 16", v) + } + if v := toFloat64(st.metrics.LevelDBConfigCompactionTableSize); v != 8*1024*1024 { + t.Errorf("LevelDBConfigCompactionTableSize = %v, want 8388608", v) + } + // Default options -> 0/default values, but initStore might set defaults if 0? + // In strict sense, opts.LdbWriteBufferSize is 0 in DefaultOptions, so it should be 0 unless processed. + // We can check if it reflects what's in opts. + + // Verify that Stats metrics are registered (not necessarily non-zero, but present) + // access via the unexported metrics field + _ = st.metrics.LevelDBBlockCacheSize + _ = st.metrics.LevelDBIOWrite + + // Wait for a ticker tick (15s in production code, might be too long for test?) + // We cannot easily speed up the ticker inside initDiskRepository without refactoring to inject the ticker/interval. + // For now, testing the config initialization is enough to prove the metrics struct is wired up. +} diff --git a/pkg/storer/metrics.go b/pkg/storer/metrics.go index 68be82ac039..02d605afc21 100644 --- a/pkg/storer/metrics.go +++ b/pkg/storer/metrics.go @@ -17,24 +17,37 @@ import ( // metrics groups storer related prometheus counters. type metrics struct { - MethodCalls *prometheus.CounterVec - MethodCallsDuration *prometheus.HistogramVec - ReserveSize prometheus.Gauge - ReserveSizeWithinRadius prometheus.Gauge - ReserveCleanup prometheus.Counter - StorageRadius prometheus.Gauge - CacheSize prometheus.Gauge - EvictedChunkCount prometheus.Counter - ExpiredChunkCount prometheus.Counter - OverCapTriggerCount prometheus.Counter - ExpiredBatchCount prometheus.Counter - LevelDBStats *prometheus.HistogramVec - ExpiryTriggersCount prometheus.Counter - ExpiryRunsCount prometheus.Counter - ReserveMissingBatch prometheus.Gauge - ReserveSampleDuration *prometheus.HistogramVec - ReserveSampleRunSummary *prometheus.GaugeVec - ReserveSampleLastRunTimestamp prometheus.Gauge + MethodCalls *prometheus.CounterVec + MethodCallsDuration *prometheus.HistogramVec + ReserveSize prometheus.Gauge + ReserveSizeWithinRadius prometheus.Gauge + ReserveCleanup prometheus.Counter + StorageRadius prometheus.Gauge + CacheSize prometheus.Gauge + EvictedChunkCount prometheus.Counter + ExpiredChunkCount prometheus.Counter + OverCapTriggerCount prometheus.Counter + ExpiredBatchCount prometheus.Counter + LevelDBStats *prometheus.HistogramVec + ExpiryTriggersCount prometheus.Counter + ExpiryRunsCount prometheus.Counter + ReserveMissingBatch prometheus.Gauge + ReserveSampleDuration *prometheus.HistogramVec + ReserveSampleRunSummary *prometheus.GaugeVec + ReserveSampleLastRunTimestamp prometheus.Gauge + LevelDBBlockCacheSize prometheus.Gauge + LevelDBAliveSnapshots prometheus.Gauge + LevelDBAliveIterators prometheus.Gauge + LevelDBIOWrite prometheus.Gauge + LevelDBIORead prometheus.Gauge + LevelDBWriteDelayCount prometheus.Counter + LevelDBWriteDelayDuration prometheus.Counter + LevelDBMemComp prometheus.Counter + LevelDBLevel0Comp prometheus.Counter + LevelDBConfigWriteBufferSize prometheus.Gauge + LevelDBConfigBlockCacheCapacity prometheus.Gauge + LevelDBConfigCompactionL0Trigger prometheus.Gauge + LevelDBConfigCompactionTableSize prometheus.Gauge } // newMetrics is a convenient constructor for creating new metrics. @@ -192,6 +205,110 @@ func newMetrics() metrics { Help: "Unix timestamp of the last ReserveSample run completion.", }, ), + LevelDBBlockCacheSize: prometheus.NewGauge( + prometheus.GaugeOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "leveldb_block_cache_size", + Help: "LevelDB block cache size.", + }, + ), + LevelDBAliveSnapshots: prometheus.NewGauge( + prometheus.GaugeOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "leveldb_alive_snapshots", + Help: "LevelDB alive snapshots.", + }, + ), + LevelDBAliveIterators: prometheus.NewGauge( + prometheus.GaugeOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "leveldb_alive_iterators", + Help: "LevelDB alive iterators.", + }, + ), + LevelDBIOWrite: prometheus.NewGauge( + prometheus.GaugeOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "leveldb_io_write", + Help: "LevelDB IO write.", + }, + ), + LevelDBIORead: prometheus.NewGauge( + prometheus.GaugeOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "leveldb_io_read", + Help: "LevelDB IO read.", + }, + ), + LevelDBWriteDelayCount: prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "leveldb_write_delay_count", + Help: "LevelDB write delay count.", + }, + ), + LevelDBWriteDelayDuration: prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "leveldb_write_delay_duration_seconds", + Help: "LevelDB write delay duration in seconds.", + }, + ), + LevelDBMemComp: prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "leveldb_mem_comp", + Help: "LevelDB mem compaction.", + }, + ), + LevelDBLevel0Comp: prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "leveldb_level0_comp", + Help: "LevelDB level0 compaction.", + }, + ), + LevelDBConfigWriteBufferSize: prometheus.NewGauge( + prometheus.GaugeOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "leveldb_config_write_buffer_size", + Help: "LevelDB config write buffer size.", + }, + ), + LevelDBConfigBlockCacheCapacity: prometheus.NewGauge( + prometheus.GaugeOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "leveldb_config_block_cache_capacity", + Help: "LevelDB config block cache capacity.", + }, + ), + LevelDBConfigCompactionL0Trigger: prometheus.NewGauge( + prometheus.GaugeOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "leveldb_config_compaction_l0_trigger", + Help: "LevelDB config compaction l0 trigger.", + }, + ), + LevelDBConfigCompactionTableSize: prometheus.NewGauge( + prometheus.GaugeOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "leveldb_config_compaction_table_size", + Help: "LevelDB config compaction table size.", + }, + ), } } diff --git a/pkg/storer/storer.go b/pkg/storer/storer.go index e3318c9b278..7934c5e4cf2 100644 --- a/pkg/storer/storer.go +++ b/pkg/storer/storer.go @@ -243,8 +243,8 @@ const loggerName = "storer" // Default options for levelDB. const ( defaultOpenFilesLimit = uint64(256) - defaultBlockCacheCapacity = uint64(32 * 1024 * 1024) - defaultWriteBufferSize = uint64(32 * 1024 * 1024) + defaultBlockCacheCapacity = uint64(256 * 1024 * 1024) + defaultWriteBufferSize = uint64(128 * 1024 * 1024) defaultDisableSeeksCompaction = false defaultCacheCapacity = uint64(1_000_000) defaultBgCacheWorkers = 32 @@ -267,9 +267,10 @@ func initStore(basePath string, opts *Options) (*leveldbstore.Store, error) { OpenFilesCacheCapacity: int(opts.LdbOpenFilesLimit), BlockCacheCapacity: int(opts.LdbBlockCacheCapacity), WriteBuffer: int(opts.LdbWriteBufferSize), - DisableSeeksCompaction: opts.LdbDisableSeeksCompaction, - CompactionL0Trigger: 8, - Filter: filter.NewBloomFilter(64), + CompactionL0Trigger: 16, + CompactionTableSize: 8 * 1024 * 1024, + Filter: filter.NewBloomFilter(10), + DisableSeeksCompaction: true, }) if err != nil { return nil, fmt.Errorf("failed creating levelDB index store: %w", err) @@ -282,6 +283,7 @@ func initDiskRepository( ctx context.Context, basePath string, opts *Options, + metrics metrics, ) (transaction.Storage, *PinIntegrity, io.Closer, error) { store, err := initStore(basePath, opts) if err != nil { @@ -293,6 +295,11 @@ func initDiskRepository( return nil, nil, nil, errors.Join(store.Close(), fmt.Errorf("failed core migration: %w", err)) } + metrics.LevelDBConfigWriteBufferSize.Set(float64(opts.LdbWriteBufferSize)) + metrics.LevelDBConfigBlockCacheCapacity.Set(float64(opts.LdbBlockCacheCapacity)) + metrics.LevelDBConfigCompactionL0Trigger.Set(16) + metrics.LevelDBConfigCompactionTableSize.Set(8 * 1024 * 1024) + if opts.LdbStats.Load() != nil { go func() { ldbStats := opts.LdbStats.Load() @@ -331,6 +338,16 @@ func initDiskRepository( ldbStats.WithLabelValues(fmt.Sprintf("level_%d_write", i)).Observe(float64(stats.LevelWrite[i])) ldbStats.WithLabelValues(fmt.Sprintf("level_%d_duration", i)).Observe(stats.LevelDurations[i].Seconds()) } + + metrics.LevelDBWriteDelayCount.Add(float64(stats.WriteDelayCount)) + metrics.LevelDBWriteDelayDuration.Add(stats.WriteDelayDuration.Seconds()) + metrics.LevelDBAliveSnapshots.Set(float64(stats.AliveSnapshots)) + metrics.LevelDBAliveIterators.Set(float64(stats.AliveIterators)) + metrics.LevelDBIOWrite.Set(float64(stats.IOWrite)) + metrics.LevelDBIORead.Set(float64(stats.IORead)) + metrics.LevelDBBlockCacheSize.Set(float64(stats.BlockCacheSize)) + metrics.LevelDBMemComp.Add(float64(stats.MemComp)) + metrics.LevelDBLevel0Comp.Add(float64(stats.Level0Comp)) } } } @@ -488,7 +505,7 @@ func New(ctx context.Context, dirPath string, opts *Options) (*DB, error) { return nil, err } } else { - st, pinIntegrity, dbCloser, err = initDiskRepository(ctx, dirPath, opts) + st, pinIntegrity, dbCloser, err = initDiskRepository(ctx, dirPath, opts, metrics) if err != nil { return nil, err }