Skip to content
Open
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Changelog

## master / unreleased
* [FEATURE] Querier: Add experimental per-tenant cardinality API (`GET /api/v1/cardinality`) that exposes top-N metrics by series count, label names by distinct value count, and label-value pairs by series count from ingester TSDB heads (`source=head`) and compacted blocks (`source=blocks`). Gated behind `-querier.cardinality-api-enabled` (default `false`). #7384
* [ENHANCEMENT] Metrics Helper: Add native histogram support for aggregating and merging, including dual-format histogram handling that exposes both native and classic bucket formats. #7359
* [ENHANCEMENT] Cache: Add per-tenant TTL configuration for query results cache to control cache expiration on a per-tenant basis with separate TTLs for regular and out-of-order data. #7357
* [ENHANCEMENT] Tenant Federation: Add a local cache to regex resolver. #7363
Expand Down
20 changes: 20 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -4286,6 +4286,26 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s
# zones are not available.
[query_partial_data: <boolean> | default = false]

# [Experimental] Enables the per-tenant cardinality API endpoint. When disabled,
# the endpoint returns HTTP 403.
# CLI flag: -querier.cardinality-api-enabled
[cardinality_api_enabled: <boolean> | default = false]

# [Experimental] Maximum allowed time range (end - start) for source=blocks
# cardinality queries.
# CLI flag: -querier.cardinality-max-query-range
[cardinality_max_query_range: <duration> | default = 1d]

# [Experimental] Maximum number of concurrent cardinality requests per tenant.
# Excess requests are rejected with HTTP 429.
# CLI flag: -querier.cardinality-max-concurrent-requests
[cardinality_max_concurrent_requests: <int> | default = 2]

# [Experimental] Per-request timeout for cardinality computation. On timeout,
# partial results are returned.
# CLI flag: -querier.cardinality-query-timeout
[cardinality_query_timeout: <duration> | default = 1m]

# The maximum number of rows that can be fetched when querying parquet storage.
# Each row maps to a series in a parquet file. This limit applies before
# materializing chunks. 0 to disable.
Expand Down
4 changes: 4 additions & 0 deletions docs/getting-started/cortex-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ compactor:
frontend_worker:
match_max_concurrent: true

# https://cortexmetrics.io/docs/configuration/configuration-file/#limits_config
limits:
cardinality_api_enabled: true

# https://cortexmetrics.io/docs/configuration/configuration-file/#ruler_config
ruler:
enable_api: true
Expand Down
175 changes: 175 additions & 0 deletions integration/cardinality_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
//go:build requires_docker

package integration

import (
"encoding/json"
"testing"
"time"

"github.com/prometheus/prometheus/prompb"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/cortexproject/cortex/integration/e2e"
e2edb "github.com/cortexproject/cortex/integration/e2e/db"
"github.com/cortexproject/cortex/integration/e2ecortex"
)

type cardinalityAPIResponse struct {
Status string `json:"status"`
Data struct {
NumSeries uint64 `json:"numSeries"`
Approximated bool `json:"approximated"`
SeriesCountByMetricName []struct {
Name string `json:"name"`
Value uint64 `json:"value"`
} `json:"seriesCountByMetricName"`
LabelValueCountByLabelName []struct {
Name string `json:"name"`
Value uint64 `json:"value"`
} `json:"labelValueCountByLabelName"`
SeriesCountByLabelValuePair []struct {
Name string `json:"name"`
Value uint64 `json:"value"`
} `json:"seriesCountByLabelValuePair"`
} `json:"data"`
}

func TestCardinalityAPI(t *testing.T) {
const blockRangePeriod = 5 * time.Second

s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

// Start dependencies.
minio := e2edb.NewMinio(9000, bucketName)
require.NoError(t, s.StartAndWaitReady(minio))

// Configure the blocks storage to frequently compact TSDB head and ship blocks to storage.
flags := mergeFlags(BlocksStorageFlags(), AlertmanagerLocalFlags(), map[string]string{
"-blocks-storage.tsdb.block-ranges-period": blockRangePeriod.String(),
"-blocks-storage.tsdb.ship-interval": "1s",
"-blocks-storage.bucket-store.sync-interval": "1s",
"-blocks-storage.tsdb.retention-period": ((blockRangePeriod * 2) - 1).String(),
"-blocks-storage.bucket-store.bucket-index.enabled": "false",
"-querier.cardinality-api-enabled": "true",
"-alertmanager.web.external-url": "http://localhost/alertmanager",
// Use inmemory ring to avoid needing Consul.
"-ring.store": "inmemory",
"-compactor.ring.store": "inmemory",
"-store-gateway.sharding-ring.store": "inmemory",
"-store-gateway.sharding-enabled": "true",
"-store-gateway.sharding-ring.replication-factor": "1",
})

require.NoError(t, writeFileToSharedDir(s, "alertmanager_configs", []byte{}))

cortex := e2ecortex.NewSingleBinary("cortex-1", flags, "")
require.NoError(t, s.StartAndWaitReady(cortex))

c, err := e2ecortex.NewClient(cortex.HTTPEndpoint(), cortex.HTTPEndpoint(), "", "", "user-1")
require.NoError(t, err)

// Push multiple series with different metric names and labels.
now := time.Now()
series1, _ := generateSeries("test_metric_1", now, prompb.Label{Name: "job", Value: "api"})
series2, _ := generateSeries("test_metric_2", now, prompb.Label{Name: "job", Value: "worker"})
series3, _ := generateSeries("test_metric_3", now, prompb.Label{Name: "job", Value: "api"}, prompb.Label{Name: "instance", Value: "host1"})

for _, s := range [][]prompb.TimeSeries{series1, series2, series3} {
res, err := c.Push(s)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)
}

// --- Test 1: Head path ---
t.Run("head path returns cardinality data", func(t *testing.T) {
resp, body, err := c.CardinalityRaw("head", 10, time.Time{}, time.Time{})
require.NoError(t, err)
require.Equal(t, 200, resp.StatusCode, "body: %s", string(body))

var result cardinalityAPIResponse
require.NoError(t, json.Unmarshal(body, &result))

assert.Equal(t, "success", result.Status)
assert.GreaterOrEqual(t, result.Data.NumSeries, uint64(3))
assert.NotEmpty(t, result.Data.SeriesCountByMetricName, "seriesCountByMetricName should not be empty")
assert.NotEmpty(t, result.Data.LabelValueCountByLabelName, "labelValueCountByLabelName should not be empty")
assert.NotEmpty(t, result.Data.SeriesCountByLabelValuePair, "seriesCountByLabelValuePair should not be empty")
})

// --- Test 2: Default source (should be head) ---
t.Run("default source is head", func(t *testing.T) {
resp, body, err := c.CardinalityRaw("", 10, time.Time{}, time.Time{})
require.NoError(t, err)
require.Equal(t, 200, resp.StatusCode, "body: %s", string(body))

var result cardinalityAPIResponse
require.NoError(t, json.Unmarshal(body, &result))
assert.Equal(t, "success", result.Status)
assert.GreaterOrEqual(t, result.Data.NumSeries, uint64(3))
})

// --- Test 3: Parameter validation ---
t.Run("invalid source returns 400", func(t *testing.T) {
resp, _, err := c.CardinalityRaw("invalid", 0, time.Time{}, time.Time{})
require.NoError(t, err)
assert.Equal(t, 400, resp.StatusCode)
})

// --- Test 4: Blocks path ---
// Push series at timestamps spanning two block ranges to trigger head compaction and shipping.
t.Run("blocks path returns cardinality data", func(t *testing.T) {
// Push a series at a timestamp in a different block range to trigger compaction of the first block.
series4, _ := generateSeries("test_metric_4", now.Add(blockRangePeriod*2),
prompb.Label{Name: "job", Value: "scheduler"})
res, err := c.Push(series4)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)

// Wait until at least one block is shipped from the ingester.
require.NoError(t, cortex.WaitSumMetricsWithOptions(
e2e.Greater(0),
[]string{"cortex_ingester_shipper_uploads_total"},
e2e.WaitMissingMetrics,
))

// Wait until the store gateway has loaded the shipped blocks.
require.NoError(t, cortex.WaitSumMetricsWithOptions(
e2e.Greater(0),
[]string{"cortex_bucket_store_blocks_loaded"},
e2e.WaitMissingMetrics,
))

// Query the blocks path with retries. The querier's block finder and
// store gateway may need additional sync cycles before returning data.
start := now.Add(-1 * time.Hour)
end := now.Add(1 * time.Hour)
deadline := time.Now().Add(30 * time.Second)

var result cardinalityAPIResponse
for time.Now().Before(deadline) {
resp, body, err := c.CardinalityRaw("blocks", 10, start, end)
require.NoError(t, err)
require.Equal(t, 200, resp.StatusCode, "body: %s", string(body))
require.NoError(t, json.Unmarshal(body, &result))

if len(result.Data.LabelValueCountByLabelName) > 0 {
break
}
time.Sleep(1 * time.Second)
}

assert.Equal(t, "success", result.Status)
assert.NotEmpty(t, result.Data.LabelValueCountByLabelName, "labelValueCountByLabelName should not be empty after retries")
})

// --- Test 5: Blocks path requires start/end ---
t.Run("blocks path without start/end returns 400", func(t *testing.T) {
resp, _, err := c.CardinalityRaw("blocks", 0, time.Time{}, time.Time{})
require.NoError(t, err)
assert.Equal(t, 400, resp.StatusCode)
})
}
25 changes: 25 additions & 0 deletions integration/e2ecortex/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,31 @@ func (c *Client) LabelValuesRaw(label string, matches []string, startTime, endTi
return c.query(u.String(), headers)
}

// CardinalityRaw runs a cardinality request directly against the querier API.
func (c *Client) CardinalityRaw(source string, limit int, start, end time.Time) (*http.Response, []byte, error) {
u := &url.URL{
Scheme: "http",
Path: fmt.Sprintf("%s/api/prom/api/v1/cardinality", c.querierAddress),
}
q := u.Query()

if source != "" {
q.Set("source", source)
}
if limit > 0 {
q.Set("limit", strconv.Itoa(limit))
}
if !start.IsZero() {
q.Set("start", FormatTime(start))
}
if !end.IsZero() {
q.Set("end", FormatTime(end))
}

u.RawQuery = q.Encode()
return c.query(u.String(), nil)
}

// RemoteRead runs a remote read query.
func (c *Client) RemoteRead(matchers []*labels.Matcher, start, end time.Time, step time.Duration) (*prompb.ReadResponse, error) {
startMs := start.UnixMilli()
Expand Down
3 changes: 2 additions & 1 deletion pkg/cortex/cortex.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,8 @@ type Cortex struct {

// Queryables that the querier should use to query the long
// term storage. It depends on the storage engine used.
StoreQueryables []querier.QueryableWithFilter
StoreQueryables []querier.QueryableWithFilter
BlocksStoreQueryable *querier.BlocksStoreQueryable
}

// New makes a new Cortex.
Expand Down
8 changes: 8 additions & 0 deletions pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"log/slog"
"net/http"
"path"
"runtime"
"runtime/debug"

Expand Down Expand Up @@ -293,6 +294,12 @@ func (t *Cortex) initQueryable() (serv services.Service, err error) {
// Register the default endpoints that are always enabled for the querier module
t.API.RegisterQueryable(t.QuerierQueryable, t.Distributor)

// Register the cardinality endpoint directly on the external API server.
// This endpoint bypasses the query-frontend and is served directly by the querier.
cardinalityHandler := querier.CardinalityHandler(t.Distributor, t.BlocksStoreQueryable, t.OverridesConfig, prometheus.DefaultRegisterer)
t.API.RegisterRoute(path.Join(t.Cfg.API.PrometheusHTTPPrefix, "/api/v1/cardinality"), cardinalityHandler, true, "GET")
t.API.RegisterRoute(path.Join(t.Cfg.API.LegacyHTTPPrefix, "/api/v1/cardinality"), cardinalityHandler, true, "GET")

return nil, nil
}

Expand Down Expand Up @@ -448,6 +455,7 @@ func (t *Cortex) initStoreQueryables() (services.Service, error) {
if q, err := initBlockStoreQueryable(t.Cfg, t.OverridesConfig, prometheus.DefaultRegisterer); err != nil {
return nil, fmt.Errorf("failed to initialize querier: %v", err)
} else {
t.BlocksStoreQueryable = q
queriable = q
if t.Cfg.Querier.EnableParquetQueryable {
pq, err := querier.NewParquetQueryable(t.Cfg.Querier, t.Cfg.BlocksStorage, t.OverridesConfig, q, util_log.Logger, prometheus.DefaultRegisterer)
Expand Down
Loading
Loading