Skip to content
Open
14 changes: 14 additions & 0 deletions docs/blocks-storage/querier.md
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,20 @@ querier:
# mixed block types (parquet and non-parquet) and not querying ingesters.
# CLI flag: -querier.honor-projection-hints
[honor_projection_hints: <boolean> | default = false]

# If true, classify query timeouts as 4XX (user error) or 5XX (system error)
# based on phase timing.
# CLI flag: -querier.timeout-classification-enabled
[timeout_classification_enabled: <boolean> | default = false]

# The total time before the querier proactively cancels a query for timeout
# classification.
# CLI flag: -querier.timeout-classification-deadline
[timeout_classification_deadline: <duration> | default = 59s]

# Eval time threshold above which a timeout is classified as user error (4XX).
# CLI flag: -querier.timeout-classification-eval-threshold
[timeout_classification_eval_threshold: <duration> | default = 40s]
```

### `blocks_storage_config`
Expand Down
14 changes: 14 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -4959,6 +4959,20 @@ thanos_engine:
# types (parquet and non-parquet) and not querying ingesters.
# CLI flag: -querier.honor-projection-hints
[honor_projection_hints: <boolean> | default = false]

# If true, classify query timeouts as 4XX (user error) or 5XX (system error)
# based on phase timing.
# CLI flag: -querier.timeout-classification-enabled
[timeout_classification_enabled: <boolean> | default = false]

# The total time before the querier proactively cancels a query for timeout
# classification.
# CLI flag: -querier.timeout-classification-deadline
[timeout_classification_deadline: <duration> | default = 59s]

# Eval time threshold above which a timeout is classified as user error (4XX).
# CLI flag: -querier.timeout-classification-eval-threshold
[timeout_classification_eval_threshold: <duration> | default = 40s]
```

### `query_frontend_config`
Expand Down
6 changes: 5 additions & 1 deletion pkg/api/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,11 @@ func NewQuerierHandler(
legacyPromRouter := route.New().WithPrefix(path.Join(legacyPrefix, "/api/v1"))
api.Register(legacyPromRouter)

queryAPI := queryapi.NewQueryAPI(engine, translateSampleAndChunkQueryable, statsRenderer, logger, codecs, corsOrigin)
queryAPI := queryapi.NewQueryAPI(engine, translateSampleAndChunkQueryable, statsRenderer, logger, codecs, corsOrigin, stats.PhaseTrackerConfig{
TotalTimeout: querierCfg.TimeoutClassificationDeadline,
EvalTimeThreshold: querierCfg.TimeoutClassificationEvalThreshold,
Enabled: querierCfg.TimeoutClassificationEnabled,
})

requestTracker := request_tracker.NewRequestTracker(querierCfg.ActiveQueryTrackerDir, "apis.active", querierCfg.MaxConcurrent, util_log.GoKitLogToSlog(logger))
var apiHandler http.Handler
Expand Down
145 changes: 131 additions & 14 deletions pkg/api/queryapi/query_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,21 @@ import (
"github.com/cortexproject/cortex/pkg/distributed_execution"
"github.com/cortexproject/cortex/pkg/engine"
"github.com/cortexproject/cortex/pkg/querier"
"github.com/cortexproject/cortex/pkg/querier/stats"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/api"
"github.com/cortexproject/cortex/pkg/util/requestmeta"
)

type QueryAPI struct {
queryable storage.SampleAndChunkQueryable
queryEngine engine.QueryEngine
now func() time.Time
statsRenderer v1.StatsRenderer
logger log.Logger
codecs []v1.Codec
CORSOrigin *regexp.Regexp
queryable storage.SampleAndChunkQueryable
queryEngine engine.QueryEngine
now func() time.Time
statsRenderer v1.StatsRenderer
logger log.Logger
codecs []v1.Codec
CORSOrigin *regexp.Regexp
timeoutClassification stats.PhaseTrackerConfig
}

func NewQueryAPI(
Expand All @@ -42,15 +45,17 @@ func NewQueryAPI(
logger log.Logger,
codecs []v1.Codec,
CORSOrigin *regexp.Regexp,
timeoutClassification stats.PhaseTrackerConfig,
) *QueryAPI {
return &QueryAPI{
queryEngine: qe,
queryable: q,
statsRenderer: statsRenderer,
logger: logger,
codecs: codecs,
CORSOrigin: CORSOrigin,
now: time.Now,
queryEngine: qe,
queryable: q,
statsRenderer: statsRenderer,
logger: logger,
codecs: codecs,
CORSOrigin: CORSOrigin,
now: time.Now,
timeoutClassification: timeoutClassification,
}
}

Expand Down Expand Up @@ -84,6 +89,11 @@ func (q *QueryAPI) RangeQueryHandler(r *http.Request) (result apiFuncResult) {
}

ctx := r.Context()

// Always record query start time for phase tracking, regardless of feature flag.
queryStats := stats.FromContext(ctx)
queryStats.SetQueryStart(time.Now())

if to := r.FormValue("timeout"); to != "" {
var cancel context.CancelFunc
timeout, err := util.ParseDurationMs(to)
Expand All @@ -95,6 +105,15 @@ func (q *QueryAPI) RangeQueryHandler(r *http.Request) (result apiFuncResult) {
defer cancel()
}

cfg := q.timeoutClassification
ctx, cancel, earlyResult := applyTimeoutClassification(ctx, queryStats, cfg)
if cancel != nil {
defer cancel()
}
if earlyResult != nil {
return *earlyResult
}

opts, err := extractQueryOpts(r)
if err != nil {
return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil}
Expand Down Expand Up @@ -138,6 +157,13 @@ func (q *QueryAPI) RangeQueryHandler(r *http.Request) (result apiFuncResult) {

res := qry.Exec(ctx)
if res.Err != nil {
// If the context was cancelled/timed out, apply timeout classification.
if ctx.Err() != nil {
if classified := q.classifyTimeout(ctx, queryStats, cfg, res.Warnings, qry.Close); classified != nil {
return *classified
}
}

return apiFuncResult{nil, returnAPIError(res.Err), res.Warnings, qry.Close}
}

Expand All @@ -159,6 +185,11 @@ func (q *QueryAPI) InstantQueryHandler(r *http.Request) (result apiFuncResult) {
}

ctx := r.Context()

// Always record query start time for phase tracking, regardless of feature flag.
queryStats := stats.FromContext(ctx)
queryStats.SetQueryStart(time.Now())

if to := r.FormValue("timeout"); to != "" {
var cancel context.CancelFunc
timeout, err := util.ParseDurationMs(to)
Expand All @@ -170,6 +201,15 @@ func (q *QueryAPI) InstantQueryHandler(r *http.Request) (result apiFuncResult) {
defer cancel()
}

cfg := q.timeoutClassification
ctx, cancel, earlyResult := applyTimeoutClassification(ctx, queryStats, cfg)
if cancel != nil {
defer cancel()
}
if earlyResult != nil {
return *earlyResult
}

opts, err := extractQueryOpts(r)
if err != nil {
return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil}
Expand Down Expand Up @@ -211,6 +251,13 @@ func (q *QueryAPI) InstantQueryHandler(r *http.Request) (result apiFuncResult) {

res := qry.Exec(ctx)
if res.Err != nil {
// If the context was cancelled/timed out, apply timeout classification.
if ctx.Err() != nil {
if classified := q.classifyTimeout(ctx, queryStats, cfg, res.Warnings, qry.Close); classified != nil {
return *classified
}
}

return apiFuncResult{nil, returnAPIError(res.Err), res.Warnings, qry.Close}
}

Expand Down Expand Up @@ -281,6 +328,76 @@ func (q *QueryAPI) respond(w http.ResponseWriter, req *http.Request, data any, w
}
}

// applyTimeoutClassification creates a proactive context timeout that fires before
// the PromQL engine's own timeout, adjusted for queue wait time. Returns the
// (possibly wrapped) context, an optional cancel func, and an optional early-exit
// result when the entire timeout budget was already consumed in the queue.
func applyTimeoutClassification(ctx context.Context, queryStats *stats.QueryStats, cfg stats.PhaseTrackerConfig) (context.Context, context.CancelFunc, *apiFuncResult) {
if !cfg.Enabled {
return ctx, nil, nil
}
var queueWaitTime time.Duration
queueJoin := queryStats.LoadQueueJoinTime()
queueLeave := queryStats.LoadQueueLeaveTime()
if !queueJoin.IsZero() && !queueLeave.IsZero() {
queueWaitTime = queueLeave.Sub(queueJoin)
}
effectiveTimeout := cfg.TotalTimeout - queueWaitTime
if effectiveTimeout <= 0 {
return ctx, nil, &apiFuncResult{nil, &apiError{errorTimeout, httpgrpc.Errorf(http.StatusServiceUnavailable,
"query timed out: query spent too long in scheduler queue")}, nil, nil}
}
ctx, cancel := context.WithTimeout(ctx, effectiveTimeout)
return ctx, cancel, nil
}

// classifyTimeout inspects phase timings after a context cancellation/timeout
// and returns an apiFuncResult if the timeout should be converted to a 4XX user error.
// Returns nil if no conversion applies and the caller should use the default error path.
func (q *QueryAPI) classifyTimeout(ctx context.Context, queryStats *stats.QueryStats, cfg stats.PhaseTrackerConfig, warnings annotations.Annotations, closer func()) *apiFuncResult {
// Record query end time so that ComputeAndStoreTimingBreakdown (called
// later in scheduler_processor.runRequest) reuses the same timestamp,
// producing identical numbers in querier and query-frontend logs.
queryStats.SetQueryEnd(time.Now())

decision := stats.DecideTimeoutResponse(queryStats, cfg)

fetchTime := queryStats.LoadQueryStorageWallTime()
queryEnd := queryStats.LoadQueryEnd()
totalTime := queryEnd.Sub(queryStats.LoadQueryStart())
evalTime := totalTime - fetchTime
var queueWaitTime time.Duration
queueJoin := queryStats.LoadQueueJoinTime()
queueLeave := queryStats.LoadQueueLeaveTime()
if !queueJoin.IsZero() && !queueLeave.IsZero() {
queueWaitTime = queueLeave.Sub(queueJoin)
}
level.Warn(q.logger).Log(
"msg", "query timed out with classification",
"request_id", requestmeta.RequestIdFromContext(ctx),
"query_start", queryStats.LoadQueryStart(),
"query_end", queryEnd,
"queue_wait_time", queueWaitTime,
"fetch_time", fetchTime,
"eval_time", evalTime,
"total_time", totalTime,
"decision", decision,
"conversion_enabled", cfg.Enabled,
)

if cfg.Enabled && decision == stats.UserError4XX {
return &apiFuncResult{nil, &apiError{errorExec, httpgrpc.Errorf(http.StatusUnprocessableEntity,
"query timed out: query spent too long in evaluation - consider simplifying your query")}, warnings, closer}
}

if cfg.Enabled {
return &apiFuncResult{nil, &apiError{errorTimeout, httpgrpc.Errorf(http.StatusGatewayTimeout,
"%s", ErrUpstreamRequestTimeout)}, warnings, closer}
}

return nil
}

func (q *QueryAPI) negotiateCodec(req *http.Request, resp *v1.Response) (v1.Codec, error) {
for _, clause := range goautoneg.ParseAccept(req.Header.Get("Accept")) {
for _, codec := range q.codecs {
Expand Down
8 changes: 4 additions & 4 deletions pkg/api/queryapi/query_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func Test_CustomAPI(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
c := NewQueryAPI(engine, mockQueryable, querier.StatsRenderer, log.NewNopLogger(), []v1.Codec{v1.JSONCodec{}}, regexp.MustCompile(".*"))
c := NewQueryAPI(engine, mockQueryable, querier.StatsRenderer, log.NewNopLogger(), []v1.Codec{v1.JSONCodec{}}, regexp.MustCompile(".*"), stats.PhaseTrackerConfig{})

router := mux.NewRouter()
router.Path("/api/v1/query").Methods("POST").Handler(c.Wrap(c.InstantQueryHandler))
Expand Down Expand Up @@ -244,7 +244,7 @@ func Test_InvalidCodec(t *testing.T) {
},
}

queryAPI := NewQueryAPI(engine, mockQueryable, querier.StatsRenderer, log.NewNopLogger(), []v1.Codec{&mockCodec{}}, regexp.MustCompile(".*"))
queryAPI := NewQueryAPI(engine, mockQueryable, querier.StatsRenderer, log.NewNopLogger(), []v1.Codec{&mockCodec{}}, regexp.MustCompile(".*"), stats.PhaseTrackerConfig{})
router := mux.NewRouter()
router.Path("/api/v1/query").Methods("POST").Handler(queryAPI.Wrap(queryAPI.InstantQueryHandler))

Expand Down Expand Up @@ -285,7 +285,7 @@ func Test_CustomAPI_StatsRenderer(t *testing.T) {
},
}

queryAPI := NewQueryAPI(engine, mockQueryable, querier.StatsRenderer, log.NewNopLogger(), []v1.Codec{v1.JSONCodec{}}, regexp.MustCompile(".*"))
queryAPI := NewQueryAPI(engine, mockQueryable, querier.StatsRenderer, log.NewNopLogger(), []v1.Codec{v1.JSONCodec{}}, regexp.MustCompile(".*"), stats.PhaseTrackerConfig{})

router := mux.NewRouter()
router.Path("/api/v1/query_range").Methods("POST").Handler(queryAPI.Wrap(queryAPI.RangeQueryHandler))
Expand Down Expand Up @@ -441,7 +441,7 @@ func Test_Logicalplan_Requests(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
c := NewQueryAPI(engine, mockQueryable, querier.StatsRenderer, log.NewNopLogger(), []v1.Codec{v1.JSONCodec{}}, regexp.MustCompile(".*"))
c := NewQueryAPI(engine, mockQueryable, querier.StatsRenderer, log.NewNopLogger(), []v1.Codec{v1.JSONCodec{}}, regexp.MustCompile(".*"), stats.PhaseTrackerConfig{})
router := mux.NewRouter()
router.Path("/api/v1/query").Methods("POST").Handler(c.Wrap(c.InstantQueryHandler))
router.Path("/api/v1/query_range").Methods("POST").Handler(c.Wrap(c.RangeQueryHandler))
Expand Down
7 changes: 4 additions & 3 deletions pkg/api/queryapi/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@ import (
)

var (
ErrEndBeforeStart = httpgrpc.Errorf(http.StatusBadRequest, "%s", "end timestamp must not be before start time")
ErrNegativeStep = httpgrpc.Errorf(http.StatusBadRequest, "%s", "zero or negative query resolution step widths are not accepted. Try a positive integer")
ErrStepTooSmall = httpgrpc.Errorf(http.StatusBadRequest, "%s", "exceeded maximum resolution of 11,000 points per timeseries. Try decreasing the query resolution (?step=XX)")
ErrEndBeforeStart = httpgrpc.Errorf(http.StatusBadRequest, "%s", "end timestamp must not be before start time")
ErrNegativeStep = httpgrpc.Errorf(http.StatusBadRequest, "%s", "zero or negative query resolution step widths are not accepted. Try a positive integer")
ErrStepTooSmall = httpgrpc.Errorf(http.StatusBadRequest, "%s", "exceeded maximum resolution of 11,000 points per timeseries. Try decreasing the query resolution (?step=XX)")
ErrUpstreamRequestTimeout = "upstream request timeout"
)

func extractQueryOpts(r *http.Request) (promql.QueryOpts, error) {
Expand Down
6 changes: 5 additions & 1 deletion pkg/cortex/cortex.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ import (
)

var (
errInvalidHTTPPrefix = errors.New("HTTP prefix should be empty or start with /")
errInvalidHTTPPrefix = errors.New("HTTP prefix should be empty or start with /")
errTimeoutClassificationRequiresQueryStats = errors.New("timeout classification requires query stats to be enabled (frontend.query-stats-enabled)")
)

// The design pattern for Cortex is a series of config objects, which are
Expand Down Expand Up @@ -228,6 +229,9 @@ func (c *Config) Validate(log log.Logger) error {
if err := c.Querier.Validate(); err != nil {
return errors.Wrap(err, "invalid querier config")
}
if c.Querier.TimeoutClassificationEnabled && !c.Frontend.Handler.QueryStatsEnabled {
return errTimeoutClassificationRequiresQueryStats
}
if err := c.IngesterClient.Validate(log); err != nil {
return errors.Wrap(err, "invalid ingester_client config")
}
Expand Down
34 changes: 34 additions & 0 deletions pkg/cortex/cortex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,40 @@ func TestConfigValidation(t *testing.T) {
},
expectedError: fmt.Errorf("unsupported name validation scheme: unset"),
},
{
name: "should fail when timeout classification is enabled but query stats is disabled",
getTestConfig: func() *Config {
configuration := newDefaultConfig()
configuration.Querier.TimeoutClassificationEnabled = true
configuration.Querier.TimeoutClassificationDeadline = 59 * time.Second
configuration.Querier.TimeoutClassificationEvalThreshold = 40 * time.Second
configuration.Frontend.Handler.QueryStatsEnabled = false
return configuration
},
expectedError: errTimeoutClassificationRequiresQueryStats,
},
{
name: "should pass when timeout classification is enabled and query stats is enabled",
getTestConfig: func() *Config {
configuration := newDefaultConfig()
configuration.Querier.TimeoutClassificationEnabled = true
configuration.Querier.TimeoutClassificationDeadline = 59 * time.Second
configuration.Querier.TimeoutClassificationEvalThreshold = 40 * time.Second
configuration.Frontend.Handler.QueryStatsEnabled = true
return configuration
},
expectedError: nil,
},
{
name: "should pass when timeout classification is disabled and query stats is disabled",
getTestConfig: func() *Config {
configuration := newDefaultConfig()
configuration.Querier.TimeoutClassificationEnabled = false
configuration.Frontend.Handler.QueryStatsEnabled = false
return configuration
},
expectedError: nil,
},
} {
t.Run(tc.name, func(t *testing.T) {
err := tc.getTestConfig().Validate(nil)
Expand Down
Loading
Loading