Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ require (
github.com/pion/webrtc/v4 v4.1.2 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/prometheus/client_model v0.6.2 // indirect
github.com/prometheus/client_model v0.6.2
github.com/prometheus/common v0.64.0
github.com/prometheus/procfs v0.16.1 // indirect
github.com/prometheus/statsd_exporter v0.26.1 // indirect
Expand Down
90 changes: 90 additions & 0 deletions pkg/storer/internal_metrics_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Copyright 2023 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package storer

import (
"context"
"testing"
"time"

batchstore "github.com/ethersphere/bee/v2/pkg/postage/batchstore/mock"
stabilmock "github.com/ethersphere/bee/v2/pkg/stabilization/mock"
"github.com/ethersphere/bee/v2/pkg/swarm"
kademlia "github.com/ethersphere/bee/v2/pkg/topology/mock"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
)

func toFloat64(c prometheus.Collector) float64 {
var (
m = make(chan prometheus.Metric, 1) // Buffered channel to avoid blocking if only 1 metric

)

// Collect the metric
c.Collect(m)
close(m) // Close immediately after collect

metric := <-m
if metric == nil {
return 0
}

pb := &dto.Metric{}
if err := metric.Write(pb); err != nil {
return 0
}

if pb.Gauge != nil {
return pb.Gauge.GetValue()
}
if pb.Counter != nil {
return pb.Counter.GetValue()
}
// Add other types if necessary
return 0
}

func TestLevelDBMetrics(t *testing.T) {
// Options with LdbStats initialized to capture metrics
opts := DefaultOptions()
opts.Address = swarm.RandAddress(t)
opts.RadiusSetter = kademlia.NewTopologyDriver()
opts.Batchstore = batchstore.New()
opts.ReserveWakeUpDuration = time.Second
opts.StartupStabilizer = stabilmock.NewSubscriber(true)
opts.LdbStats.Store(prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: "test_ldb_stats",
Help: "Test LevelDB stats",
}, []string{"counter"}))

// Create a real disk storer to trigger initDiskRepository
dir := t.TempDir()
st, err := New(context.Background(), dir, opts)
if err != nil {
t.Fatalf("New(...) unexpected error: %v", err)
}
defer st.Close()

// Verify Configuration Metrics
if v := toFloat64(st.metrics.LevelDBConfigCompactionL0Trigger); v != 16 {
t.Errorf("LevelDBConfigCompactionL0Trigger = %v, want 16", v)
}
if v := toFloat64(st.metrics.LevelDBConfigCompactionTableSize); v != 8*1024*1024 {
t.Errorf("LevelDBConfigCompactionTableSize = %v, want 8388608", v)
}
// Default options -> 0/default values, but initStore might set defaults if 0?
// In strict sense, opts.LdbWriteBufferSize is 0 in DefaultOptions, so it should be 0 unless processed.
// We can check if it reflects what's in opts.

// Verify that Stats metrics are registered (not necessarily non-zero, but present)
// access via the unexported metrics field
_ = st.metrics.LevelDBBlockCacheSize
_ = st.metrics.LevelDBIOWrite

// Wait for a ticker tick (15s in production code, might be too long for test?)
// We cannot easily speed up the ticker inside initDiskRepository without refactoring to inject the ticker/interval.
// For now, testing the config initialization is enough to prove the metrics struct is wired up.
}
153 changes: 135 additions & 18 deletions pkg/storer/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,37 @@ import (

// metrics groups storer related prometheus counters.
type metrics struct {
MethodCalls *prometheus.CounterVec
MethodCallsDuration *prometheus.HistogramVec
ReserveSize prometheus.Gauge
ReserveSizeWithinRadius prometheus.Gauge
ReserveCleanup prometheus.Counter
StorageRadius prometheus.Gauge
CacheSize prometheus.Gauge
EvictedChunkCount prometheus.Counter
ExpiredChunkCount prometheus.Counter
OverCapTriggerCount prometheus.Counter
ExpiredBatchCount prometheus.Counter
LevelDBStats *prometheus.HistogramVec
ExpiryTriggersCount prometheus.Counter
ExpiryRunsCount prometheus.Counter
ReserveMissingBatch prometheus.Gauge
ReserveSampleDuration *prometheus.HistogramVec
ReserveSampleRunSummary *prometheus.GaugeVec
ReserveSampleLastRunTimestamp prometheus.Gauge
MethodCalls *prometheus.CounterVec
MethodCallsDuration *prometheus.HistogramVec
ReserveSize prometheus.Gauge
ReserveSizeWithinRadius prometheus.Gauge
ReserveCleanup prometheus.Counter
StorageRadius prometheus.Gauge
CacheSize prometheus.Gauge
EvictedChunkCount prometheus.Counter
ExpiredChunkCount prometheus.Counter
OverCapTriggerCount prometheus.Counter
ExpiredBatchCount prometheus.Counter
LevelDBStats *prometheus.HistogramVec
ExpiryTriggersCount prometheus.Counter
ExpiryRunsCount prometheus.Counter
ReserveMissingBatch prometheus.Gauge
ReserveSampleDuration *prometheus.HistogramVec
ReserveSampleRunSummary *prometheus.GaugeVec
ReserveSampleLastRunTimestamp prometheus.Gauge
LevelDBBlockCacheSize prometheus.Gauge
LevelDBAliveSnapshots prometheus.Gauge
LevelDBAliveIterators prometheus.Gauge
LevelDBIOWrite prometheus.Gauge
LevelDBIORead prometheus.Gauge
LevelDBWriteDelayCount prometheus.Counter
LevelDBWriteDelayDuration prometheus.Counter
LevelDBMemComp prometheus.Counter
LevelDBLevel0Comp prometheus.Counter
LevelDBConfigWriteBufferSize prometheus.Gauge
LevelDBConfigBlockCacheCapacity prometheus.Gauge
LevelDBConfigCompactionL0Trigger prometheus.Gauge
LevelDBConfigCompactionTableSize prometheus.Gauge
}

// newMetrics is a convenient constructor for creating new metrics.
Expand Down Expand Up @@ -192,6 +205,110 @@ func newMetrics() metrics {
Help: "Unix timestamp of the last ReserveSample run completion.",
},
),
LevelDBBlockCacheSize: prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "leveldb_block_cache_size",
Help: "LevelDB block cache size.",
},
),
LevelDBAliveSnapshots: prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "leveldb_alive_snapshots",
Help: "LevelDB alive snapshots.",
},
),
LevelDBAliveIterators: prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "leveldb_alive_iterators",
Help: "LevelDB alive iterators.",
},
),
LevelDBIOWrite: prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "leveldb_io_write",
Help: "LevelDB IO write.",
},
),
LevelDBIORead: prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "leveldb_io_read",
Help: "LevelDB IO read.",
},
),
LevelDBWriteDelayCount: prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "leveldb_write_delay_count",
Help: "LevelDB write delay count.",
},
),
LevelDBWriteDelayDuration: prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "leveldb_write_delay_duration_seconds",
Help: "LevelDB write delay duration in seconds.",
},
),
LevelDBMemComp: prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "leveldb_mem_comp",
Help: "LevelDB mem compaction.",
},
),
LevelDBLevel0Comp: prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "leveldb_level0_comp",
Help: "LevelDB level0 compaction.",
},
),
LevelDBConfigWriteBufferSize: prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "leveldb_config_write_buffer_size",
Help: "LevelDB config write buffer size.",
},
),
LevelDBConfigBlockCacheCapacity: prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "leveldb_config_block_cache_capacity",
Help: "LevelDB config block cache capacity.",
},
),
LevelDBConfigCompactionL0Trigger: prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "leveldb_config_compaction_l0_trigger",
Help: "LevelDB config compaction l0 trigger.",
},
),
LevelDBConfigCompactionTableSize: prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "leveldb_config_compaction_table_size",
Help: "LevelDB config compaction table size.",
},
),
}
}

Expand Down
29 changes: 23 additions & 6 deletions pkg/storer/storer.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,8 +243,8 @@ const loggerName = "storer"
// Default options for levelDB.
const (
defaultOpenFilesLimit = uint64(256)
defaultBlockCacheCapacity = uint64(32 * 1024 * 1024)
defaultWriteBufferSize = uint64(32 * 1024 * 1024)
defaultBlockCacheCapacity = uint64(256 * 1024 * 1024)
defaultWriteBufferSize = uint64(128 * 1024 * 1024)
defaultDisableSeeksCompaction = false
defaultCacheCapacity = uint64(1_000_000)
defaultBgCacheWorkers = 32
Expand All @@ -267,9 +267,10 @@ func initStore(basePath string, opts *Options) (*leveldbstore.Store, error) {
OpenFilesCacheCapacity: int(opts.LdbOpenFilesLimit),
BlockCacheCapacity: int(opts.LdbBlockCacheCapacity),
WriteBuffer: int(opts.LdbWriteBufferSize),
DisableSeeksCompaction: opts.LdbDisableSeeksCompaction,
CompactionL0Trigger: 8,
Filter: filter.NewBloomFilter(64),
CompactionL0Trigger: 16,
CompactionTableSize: 8 * 1024 * 1024,
Filter: filter.NewBloomFilter(10),
DisableSeeksCompaction: true,
})
if err != nil {
return nil, fmt.Errorf("failed creating levelDB index store: %w", err)
Expand All @@ -282,6 +283,7 @@ func initDiskRepository(
ctx context.Context,
basePath string,
opts *Options,
metrics metrics,
) (transaction.Storage, *PinIntegrity, io.Closer, error) {
store, err := initStore(basePath, opts)
if err != nil {
Expand All @@ -293,6 +295,11 @@ func initDiskRepository(
return nil, nil, nil, errors.Join(store.Close(), fmt.Errorf("failed core migration: %w", err))
}

metrics.LevelDBConfigWriteBufferSize.Set(float64(opts.LdbWriteBufferSize))
metrics.LevelDBConfigBlockCacheCapacity.Set(float64(opts.LdbBlockCacheCapacity))
metrics.LevelDBConfigCompactionL0Trigger.Set(16)
metrics.LevelDBConfigCompactionTableSize.Set(8 * 1024 * 1024)

if opts.LdbStats.Load() != nil {
go func() {
ldbStats := opts.LdbStats.Load()
Expand Down Expand Up @@ -331,6 +338,16 @@ func initDiskRepository(
ldbStats.WithLabelValues(fmt.Sprintf("level_%d_write", i)).Observe(float64(stats.LevelWrite[i]))
ldbStats.WithLabelValues(fmt.Sprintf("level_%d_duration", i)).Observe(stats.LevelDurations[i].Seconds())
}

metrics.LevelDBWriteDelayCount.Add(float64(stats.WriteDelayCount))
metrics.LevelDBWriteDelayDuration.Add(stats.WriteDelayDuration.Seconds())
metrics.LevelDBAliveSnapshots.Set(float64(stats.AliveSnapshots))
metrics.LevelDBAliveIterators.Set(float64(stats.AliveIterators))
metrics.LevelDBIOWrite.Set(float64(stats.IOWrite))
metrics.LevelDBIORead.Set(float64(stats.IORead))
metrics.LevelDBBlockCacheSize.Set(float64(stats.BlockCacheSize))
metrics.LevelDBMemComp.Add(float64(stats.MemComp))
metrics.LevelDBLevel0Comp.Add(float64(stats.Level0Comp))
}
}
}
Expand Down Expand Up @@ -488,7 +505,7 @@ func New(ctx context.Context, dirPath string, opts *Options) (*DB, error) {
return nil, err
}
} else {
st, pinIntegrity, dbCloser, err = initDiskRepository(ctx, dirPath, opts)
st, pinIntegrity, dbCloser, err = initDiskRepository(ctx, dirPath, opts, metrics)
if err != nil {
return nil, err
}
Expand Down
Loading