diff --git a/api/external/cinder/messages.go b/api/external/cinder/messages.go index 260e93815..632189629 100644 --- a/api/external/cinder/messages.go +++ b/api/external/cinder/messages.go @@ -6,6 +6,7 @@ package api import ( "log/slog" + "github.com/cobaltcore-dev/cortex/api/scheduling" "github.com/cobaltcore-dev/cortex/internal/scheduling/lib" ) @@ -30,8 +31,11 @@ type ExternalSchedulerRequest struct { Weights map[string]float64 `json:"weights"` // The name of the pipeline to execute. Pipeline string `json:"pipeline"` + // Options configure the pipeline behavior for this scheduling call. + Options scheduling.Options `json:"options,omitempty"` } +func (r ExternalSchedulerRequest) GetOptions() scheduling.Options { return r.Options } func (r ExternalSchedulerRequest) GetHosts() []string { hosts := make([]string, len(r.Hosts)) for i, host := range r.Hosts { diff --git a/api/external/ironcore/messages.go b/api/external/ironcore/messages.go index ac517f61a..25c71c0a5 100644 --- a/api/external/ironcore/messages.go +++ b/api/external/ironcore/messages.go @@ -7,14 +7,18 @@ import ( "log/slog" ironcorev1alpha1 "github.com/cobaltcore-dev/cortex/api/external/ironcore/v1alpha1" + "github.com/cobaltcore-dev/cortex/api/scheduling" "github.com/cobaltcore-dev/cortex/internal/scheduling/lib" ) type MachinePipelineRequest struct { // The available machine pools. Pools []ironcorev1alpha1.MachinePool `json:"pools"` + // Options configure the pipeline behavior for this scheduling call. + Options scheduling.Options `json:"options,omitempty"` } +func (r MachinePipelineRequest) GetOptions() scheduling.Options { return r.Options } func (r MachinePipelineRequest) GetHosts() []string { hosts := make([]string, len(r.Pools)) for i, host := range r.Pools { diff --git a/api/external/manila/messages.go b/api/external/manila/messages.go index 5255a0d4f..ad0aef2f1 100644 --- a/api/external/manila/messages.go +++ b/api/external/manila/messages.go @@ -6,6 +6,7 @@ package api import ( "log/slog" + "github.com/cobaltcore-dev/cortex/api/scheduling" "github.com/cobaltcore-dev/cortex/internal/scheduling/lib" ) @@ -30,8 +31,11 @@ type ExternalSchedulerRequest struct { Weights map[string]float64 `json:"weights"` // The name of the pipeline to execute. Pipeline string `json:"pipeline"` + // Options configure the pipeline behavior for this scheduling call. + Options scheduling.Options `json:"options,omitempty"` } +func (r ExternalSchedulerRequest) GetOptions() scheduling.Options { return r.Options } func (r ExternalSchedulerRequest) GetHosts() []string { hosts := make([]string, len(r.Hosts)) for i, host := range r.Hosts { diff --git a/api/external/nova/messages.go b/api/external/nova/messages.go index e82568941..e83d37ced 100644 --- a/api/external/nova/messages.go +++ b/api/external/nova/messages.go @@ -9,6 +9,7 @@ import ( "log/slog" "strings" + "github.com/cobaltcore-dev/cortex/api/scheduling" "github.com/cobaltcore-dev/cortex/api/v1alpha1" "github.com/cobaltcore-dev/cortex/internal/scheduling/lib" ) @@ -37,8 +38,14 @@ type ExternalSchedulerRequest struct { // The name of the pipeline to execute. Pipeline string `json:"pipeline"` + + // Options configure the pipeline behavior for this scheduling call. + // Set by the caller (CR controller, failover controller, Nova). + // Nova does not set these; Cortex fills in config-derived defaults server-side. + Options scheduling.Options `json:"options,omitempty"` } +func (r ExternalSchedulerRequest) GetOptions() scheduling.Options { return r.Options } func (r ExternalSchedulerRequest) GetHosts() []string { hosts := make([]string, len(r.Hosts)) for i, host := range r.Hosts { diff --git a/api/external/pods/messages.go b/api/external/pods/messages.go index 3ec329b39..3d0930ef5 100644 --- a/api/external/pods/messages.go +++ b/api/external/pods/messages.go @@ -6,6 +6,7 @@ package pods import ( "log/slog" + "github.com/cobaltcore-dev/cortex/api/scheduling" "github.com/cobaltcore-dev/cortex/internal/scheduling/lib" corev1 "k8s.io/api/core/v1" ) @@ -15,8 +16,11 @@ type PodPipelineRequest struct { Nodes []corev1.Node `json:"nodes"` // The pod to be scheduled. Pod corev1.Pod `json:"pod"` + // Options configure the pipeline behavior for this scheduling call. + Options scheduling.Options `json:"options,omitempty"` } +func (r PodPipelineRequest) GetOptions() scheduling.Options { return r.Options } func (r PodPipelineRequest) GetHosts() []string { hosts := make([]string, len(r.Nodes)) for i, host := range r.Nodes { diff --git a/api/scheduling/options.go b/api/scheduling/options.go new file mode 100644 index 000000000..0ad2877fc --- /dev/null +++ b/api/scheduling/options.go @@ -0,0 +1,43 @@ +// Copyright SAP SE +// SPDX-License-Identifier: Apache-2.0 + +package scheduling + +import ( + "errors" + + "github.com/cobaltcore-dev/cortex/api/v1alpha1" +) + +// Options configure the behavior of a single pipeline run at call time. +// These are distinct from per-step YAML options (FilterWeigherPipelineStepOpts), +// which are static and set when the pipeline is initialized. +type Options struct { + // ReadOnly means the pipeline run does not modify shared scheduling state (reservations, + // history, inflight records). Concurrent read-only runs are safe under a shared read lock. + ReadOnly bool `json:"read_only,omitempty"` + // LockReservations prevents reservation unlocking, i.e. considering those as unavailable resources. + LockReservations bool `json:"lock_reservations,omitempty"` + // AssumeEmptyHosts ignores running instances on hosts, considering them as empty. + AssumeEmptyHosts bool `json:"assume_empty_hosts,omitempty"` + // IgnoredReservationTypes lists reservation types whose reserved capacity the capacity filter does not block. + IgnoredReservationTypes []v1alpha1.ReservationType `json:"ignored_reservation_types,omitempty"` + // MaxCandidates limits the number of hosts returned after weighing. 0 means no limit. + MaxCandidates int `json:"max_candidates,omitempty"` + + // SkipHistory skips recording the placement decision in placement history. + SkipHistory bool `json:"skip_history,omitempty"` + // SkipInflight skips creating pessimistic blocking reservations for returned candidates. + SkipInflight bool `json:"skip_inflight,omitempty"` +} + +// Validate checks for mutually exclusive or inconsistent option combinations. +func (o Options) Validate() error { + if o.ReadOnly && !o.SkipHistory { + return errors.New("read-only runs must not write scheduling history: set SkipHistory=true") + } + if o.ReadOnly && !o.SkipInflight { + return errors.New("read-only runs cannot create inflight reservations") + } + return nil +} diff --git a/api/scheduling/options_test.go b/api/scheduling/options_test.go new file mode 100644 index 000000000..5283c3317 --- /dev/null +++ b/api/scheduling/options_test.go @@ -0,0 +1,31 @@ +// Copyright SAP SE +// SPDX-License-Identifier: Apache-2.0 + +package scheduling + +import "testing" + +func TestOptions_Validate(t *testing.T) { + tests := []struct { + name string + opts Options + wantErr bool + }{ + {"zero value is valid", Options{}, false}, + {"read-only run, skipping history and inflight", Options{ReadOnly: true, SkipHistory: true, SkipInflight: true}, false}, + {"ReadOnly without SkipHistory is invalid", Options{ReadOnly: true}, true}, + {"ReadOnly without SkipInflight is invalid", Options{ReadOnly: true, SkipHistory: true}, true}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := tt.opts.Validate() + if tt.wantErr && err == nil { + t.Error("expected error, got nil") + } + if !tt.wantErr && err != nil { + t.Errorf("expected no error, got %v", err) + } + }) + } +} diff --git a/api/v1alpha1/pipeline_types.go b/api/v1alpha1/pipeline_types.go index 180db85e5..1e5cfdca3 100644 --- a/api/v1alpha1/pipeline_types.go +++ b/api/v1alpha1/pipeline_types.go @@ -78,11 +78,6 @@ type PipelineSpec struct { // +kubebuilder:validation:Optional Description string `json:"description,omitempty"` - // If this pipeline should create history objects. - // When this is false, the pipeline will still process requests. - // +kubebuilder:default=false - CreateHistory bool `json:"createHistory,omitempty"` - // If this pipeline should ignore host preselection and gather all // available placement candidates before applying filters, instead of // relying on a pre-filtered set and weights. diff --git a/docs/apis.md b/docs/apis.md index 9160f8816..f4d44dcd4 100644 --- a/docs/apis.md +++ b/docs/apis.md @@ -75,6 +75,8 @@ Pipelines bundle scheduling steps together. Filters are mandatory, while weigher The state of the pipeline is propagated automatically through the states of its steps. Check the pipeline state object to determine if the pipeline can currently be executed or not. +Pipeline behavior has two configuration layers: static per-step params defined in the Pipeline CRD YAML (thresholds, weights, traits), and call-time `Options` set by the controller invoking the pipeline (e.g. whether to record history, lock reservations, or skip VM allocation accounting). + ### Decisions ```bash diff --git a/docs/reservations/failover-reservations.md b/docs/reservations/failover-reservations.md index 14f785ae1..34a6ccc3f 100644 --- a/docs/reservations/failover-reservations.md +++ b/docs/reservations/failover-reservations.md @@ -144,20 +144,24 @@ We use three different scheduler pipelines for failover reservations, each servi **Why:** When reusing a reservation, capacity is already reserved on the target host. We only need to verify that the VM is compatible with the host (traits, capabilities, AZ, etc.) without checking if there's enough free capacity. -### `kvm-new-failover-reservation` +Options: `ReadOnly: true, SkipHistory: true` — pure compatibility check, no state mutations. + +### `kvm-general-purpose-load-balancing` (new reservation) **Used when:** Creating a new failover reservation. **Why:** When creating a new reservation, we need to find a host that: 1. Is compatible with the VM (traits, capabilities, AZ, etc.) 2. Has enough free capacity to accommodate the VM if it needs to evacuate -This is the most restrictive pipeline since we're actually reserving new capacity. +Options: `LockReservations: true, SkipHistory: true` — capacity check must see true remaining capacity with all reservation slots locked. ### `kvm-acknowledge-failover-reservation` **Used when:** Validating that an existing reservation is still valid (watch-based reconciliation). **Why:** Periodically we need to verify that a VM could still evacuate to its reserved host. This sends an evacuation-style scheduling request with only the reservation's host as the eligible target. If the scheduler rejects it, the reservation is no longer valid and should be deleted so the periodic controller can create a new one on a valid host. +Options: `ReadOnly: true, SkipHistory: true` — validation only, no state mutations. + ## Data Model ### VM Struct diff --git a/helm/bundles/cortex-ironcore/templates/pipelines.yaml b/helm/bundles/cortex-ironcore/templates/pipelines.yaml index a77991b00..768a3912f 100644 --- a/helm/bundles/cortex-ironcore/templates/pipelines.yaml +++ b/helm/bundles/cortex-ironcore/templates/pipelines.yaml @@ -8,7 +8,6 @@ spec: description: | This pipeline is used to schedule ironcore machines onto machinepools. type: filter-weigher - createHistory: false filters: [] weighers: - name: noop diff --git a/helm/bundles/cortex-nova/templates/pipelines.yaml b/helm/bundles/cortex-nova/templates/pipelines.yaml index 0c2dcd274..e98c23c79 100644 --- a/helm/bundles/cortex-nova/templates/pipelines.yaml +++ b/helm/bundles/cortex-nova/templates/pipelines.yaml @@ -14,7 +14,6 @@ spec: Specifically, this pipeline is used for general purpose workloads. type: filter-weigher - createHistory: false filters: [] weighers: - name: vmware_binpack @@ -73,6 +72,5 @@ spec: Specifically, this pipeline is used for HANA workloads. type: filter-weigher - createHistory: false filters: [] weighers: [] diff --git a/helm/bundles/cortex-nova/templates/pipelines_kvm.yaml b/helm/bundles/cortex-nova/templates/pipelines_kvm.yaml index 143c0488a..92a67110b 100644 --- a/helm/bundles/cortex-nova/templates/pipelines_kvm.yaml +++ b/helm/bundles/cortex-nova/templates/pipelines_kvm.yaml @@ -16,7 +16,6 @@ spec: type: filter-weigher # Fetch all placement candidates, ignoring nova's preselection. ignorePreselection: true - createHistory: true filters: - name: filter_correct_az description: | @@ -71,14 +70,8 @@ spec: - name: filter_has_enough_capacity description: | This step will filter out hosts that do not have enough available capacity - to host the requested flavor. If enabled, this step will subtract the - current reservations residing on this host from the available capacity. - params: - # If reserved space should be locked even for matching requests. - # For the reservations pipeline, we don't want to unlock - # reserved space, to avoid reservations for the same project - # and flavor to overlap. - - {key: lockReserved, boolValue: false} + to host the requested flavor. Reservation unlocking behavior is controlled + by the call-time Options.LockReservations flag, not by a step param. - name: filter_allowed_projects description: | This step filters hosts based on allowed projects defined in the @@ -155,7 +148,6 @@ spec: type: filter-weigher # Fetch all placement candidates, ignoring nova's preselection. ignorePreselection: true - createHistory: true filters: - name: filter_correct_az description: | @@ -210,14 +202,8 @@ spec: - name: filter_has_enough_capacity description: | This step will filter out hosts that do not have enough available capacity - to host the requested flavor. If enabled, this step will subtract the - current reservations residing on this host from the available capacity. - params: - # If reserved space should be locked even for matching requests. - # For the reservations pipeline, we don't want to unlock - # reserved space, to avoid reservations for the same project - # and flavor to overlap. - - {key: lockReserved, boolValue: false} + to host the requested flavor. Reservation unlocking behavior is controlled + by the call-time Options.LockReservations flag, not by a step param. - name: filter_allowed_projects description: | This step filters hosts based on allowed projects defined in the @@ -277,128 +263,6 @@ spec: --- apiVersion: cortex.cloud/v1alpha1 kind: Pipeline -metadata: - name: kvm-new-failover-reservation -spec: - schedulingDomain: nova - description: | - This pipeline is used by the failover reservation controller to find a host - for creating a new failover reservation. It validates host compatibility AND - checks capacity. - - Note: Domain filtering (filter_external_customer) is not applied for failover - reservations because domains are currently not considered in failover scheduling. - - This is the pipeline used for KVM hypervisors (qemu and cloud-hypervisor). - type: filter-weigher - createHistory: false - # Fetch all placement candidates, ignoring nova's preselection. - ignorePreselection: true - filters: - - name: filter_host_instructions - description: | - This step will consider the `ignore_hosts` and `force_hosts` instructions - from the nova scheduler request spec to filter out or exclusively allow - certain hosts. - - name: filter_has_enough_capacity - description: | - This step will filter out hosts that do not have enough available capacity - to host the requested flavor. If enabled, this step will subtract the - current reservations residing on this host from the available capacity. - params: - # If reserved space should be locked even for matching requests. - # For the reservations pipeline, we don't want to unlock - # reserved space, to avoid reservations for the same project - # and flavor to overlap. - - {key: lockReserved, boolValue: true} - - name: filter_has_requested_traits - description: | - This step filters hosts that do not have the requested traits given by the - nova flavor extra spec: "trait:": "forbidden" means the host must - not have the specified trait. "trait:": "required" means the host - must have the specified trait. - - name: filter_has_accelerators - description: | - This step will filter out hosts without the trait `COMPUTE_ACCELERATORS` if - the nova flavor extra specs request accelerators via "accel:device_profile". - - name: filter_correct_az - description: | - This step will filter out hosts whose aggregate information indicates they - are not placed in the requested availability zone. - - name: filter_status_conditions - description: | - This step will filter out hosts for which the hypervisor status conditions - do not meet the expected values, for example, that the hypervisor is ready - and not disabled. - # Note: filter_external_customer is intentionally omitted. - # Domains are currently not considered in failover reservations. - - name: filter_allowed_projects - description: | - This step filters hosts based on allowed projects defined in the - hypervisor resource. Note that hosts allowing all projects are still - accessible and will not be filtered out. In this way some hypervisors - are made accessible to some projects only. - - name: filter_capabilities - description: | - This step will filter out hosts that do not meet the compute capabilities - requested by the nova flavor extra specs, like `{"arch": "x86_64", - "maxphysaddr:bits": 46, ...}`. - - Note: currently, advanced boolean/numeric operators for the capabilities - like `>`, `!`, ... are not supported because they are not used by any of our - flavors in production. - - name: filter_instance_group_affinity - description: | - This step selects hosts in the instance group specified in the nova - scheduler request spec. - - name: filter_instance_group_anti_affinity - description: | - This step selects hosts not in the instance group specified in the nova - scheduler request spec, but only until the max_server_per_host limit is - reached (default = 1). - - name: filter_live_migratable - description: | - This step ensures that the target host of a live migration can accept - the migrating VM, by checking cpu architecture, cpu features, emulated - devices, and cpu modes. - - name: filter_requested_destination - description: | - This step filters hosts based on the `requested_destination` instruction - from the nova scheduler request spec. It supports filtering by host and - by aggregates. Aggregates use AND logic between list elements, with - comma-separated UUIDs within an element using OR logic. - weighers: - - name: kvm_prefer_smaller_hosts - params: - - {key: resourceWeights, floatMapValue: {"memory": 1.0}} - description: | - This step pulls virtual machines onto smaller hosts (by capacity). This - ensures that larger hosts are not overly fragmented with small VMs, - and can still accommodate larger VMs when they need to be scheduled. - - name: kvm_instance_group_soft_affinity - description: | - This weigher implements the "soft affinity" and "soft anti-affinity" policy - for instance groups in nova. - - It assigns a weight to each host based on how many instances of the same - instance group are already running on that host. The more instances of the - same group on a host, the lower (for soft-anti-affinity) or higher - (for soft-affinity) the weight, which makes it less likely or more likely, - respectively, for the scheduler to choose that host for new instances of - the same group. - - name: kvm_binpack - multiplier: -1.0 # inverted = balancing - params: - - {key: resourceWeights, floatMapValue: {"memory": 1.0}} - description: | - This step implements a balancing weigher for workloads on kvm hypervisors, - which is the opposite of binpacking. Instead of pulling the requested vm - into the smallest gaps possible, it spreads the load to ensure - workloads are balanced across hosts. In this pipeline, the balancing will - focus on general purpose virtual machines. ---- -apiVersion: cortex.cloud/v1alpha1 -kind: Pipeline metadata: name: kvm-descheduler spec: @@ -408,7 +272,6 @@ spec: compute hosts in order to optimize resource usage and performance. This is the pipeline used for KVM hypervisors (qemu and cloud-hypervisor). type: detector - createHistory: true detectors: - name: avoid_high_steal_pct description: | @@ -432,7 +295,6 @@ spec: Use case: When a VM needs failover protection and there's an existing reservation on a host, this pipeline validates the host is still suitable for the VM. type: filter-weigher - createHistory: false filters: - name: filter_host_instructions description: | @@ -496,7 +358,6 @@ spec: that the reservation host can still accommodate all allocated VMs. If validation fails for any VM, the reservation is deleted (nack). type: filter-weigher - createHistory: false filters: - name: filter_host_instructions description: | @@ -506,10 +367,9 @@ spec: - name: filter_has_enough_capacity description: | This step will filter out hosts that do not have enough available capacity - to host the requested flavor. Reservations are considered to ensure we - don't double-book capacity. - params: - - {key: lockReserved, boolValue: true} + to host the requested flavor. Reservations are locked via the call-time + Options.LockReservations flag (set by the ack caller) to prevent false-positive + unlocking of CR slots during evacuation validation. - name: filter_has_requested_traits description: | This step filters hosts that do not have the requested traits given by the @@ -571,7 +431,6 @@ spec: and all reservation blockings so that only raw hardware capacity is considered. type: filter-weigher - createDecisions: false # Fetch all placement candidates, ignoring nova's preselection. ignorePreselection: true filters: @@ -581,10 +440,8 @@ spec: - name: filter_has_enough_capacity description: | Filters hosts that cannot fit the flavor based on raw hardware capacity. - VM allocations and all reservation types are ignored to represent an - empty datacenter scenario. - params: - - {key: ignoredReservationTypes, stringListValue: ["CommittedResourceReservation", "FailoverReservation"]} + VM allocations and all reservation types are ignored via call-time Options + (AssumeEmptyHosts + IgnoredReservationTypes set by the capacity controller). - name: filter_has_requested_traits description: | Ensures hosts have the hardware traits required by the flavor. diff --git a/helm/bundles/cortex-pods/templates/pipelines.yaml b/helm/bundles/cortex-pods/templates/pipelines.yaml index edf59daa2..e51608d50 100644 --- a/helm/bundles/cortex-pods/templates/pipelines.yaml +++ b/helm/bundles/cortex-pods/templates/pipelines.yaml @@ -8,7 +8,6 @@ spec: description: | This pipeline is used to schedule pods onto nodes. type: filter-weigher - createHistory: false filters: - name: noop description: | diff --git a/helm/library/cortex/files/crds/cortex.cloud_pipelines.yaml b/helm/library/cortex/files/crds/cortex.cloud_pipelines.yaml index 80c5497aa..702703f6c 100644 --- a/helm/library/cortex/files/crds/cortex.cloud_pipelines.yaml +++ b/helm/library/cortex/files/crds/cortex.cloud_pipelines.yaml @@ -58,12 +58,6 @@ spec: spec: description: spec defines the desired state of Pipeline properties: - createHistory: - default: false - description: |- - If this pipeline should create history objects. - When this is false, the pipeline will still process requests. - type: boolean description: description: An optional description of the pipeline, helping understand its purpose. diff --git a/internal/scheduling/cinder/filter_weigher_pipeline_controller.go b/internal/scheduling/cinder/filter_weigher_pipeline_controller.go index 52ec37306..99abfc1ba 100644 --- a/internal/scheduling/cinder/filter_weigher_pipeline_controller.go +++ b/internal/scheduling/cinder/filter_weigher_pipeline_controller.go @@ -7,7 +7,6 @@ import ( "context" "encoding/json" "errors" - "fmt" "sync" "time" @@ -74,10 +73,6 @@ func (c *FilterWeigherPipelineController) ProcessNewDecisionFromAPI(ctx context. c.processMu.Lock() defer c.processMu.Unlock() - pipelineConf, ok := c.PipelineConfigs[decision.Spec.PipelineRef.Name] - if !ok { - return fmt.Errorf("pipeline %s not configured", decision.Spec.PipelineRef.Name) - } err := c.process(ctx, decision) if err != nil { meta.SetStatusCondition(&decision.Status.Conditions, metav1.Condition{ @@ -94,11 +89,6 @@ func (c *FilterWeigherPipelineController) ProcessNewDecisionFromAPI(ctx context. Message: "pipeline run succeeded", }) } - if pipelineConf.Spec.CreateHistory { - if upsertErr := c.HistoryManager.CreateOrUpdateHistory(ctx, decision, nil, err); upsertErr != nil { - ctrl.LoggerFrom(ctx).Error(upsertErr, "failed to create/update history") - } - } return err } @@ -122,6 +112,11 @@ func (c *FilterWeigherPipelineController) process(ctx context.Context, decision } result, err := pipeline.Run(request) + if !request.Options.SkipHistory { + if upsertErr := c.HistoryManager.CreateOrUpdateHistory(ctx, decision, nil, err); upsertErr != nil { + log.Error(upsertErr, "failed to create/update history") + } + } if err != nil { log.Error(err, "failed to run pipeline") return err diff --git a/internal/scheduling/cinder/filter_weigher_pipeline_controller_test.go b/internal/scheduling/cinder/filter_weigher_pipeline_controller_test.go index e51dfec87..05d474b60 100644 --- a/internal/scheduling/cinder/filter_weigher_pipeline_controller_test.go +++ b/internal/scheduling/cinder/filter_weigher_pipeline_controller_test.go @@ -17,6 +17,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client/fake" api "github.com/cobaltcore-dev/cortex/api/external/cinder" + "github.com/cobaltcore-dev/cortex/api/scheduling" "github.com/cobaltcore-dev/cortex/api/v1alpha1" "github.com/cobaltcore-dev/cortex/internal/scheduling/lib" @@ -46,6 +47,7 @@ func TestFilterWeigherPipelineController_Reconcile(t *testing.T) { }, Weights: map[string]float64{"cinder-volume-1": 1.0, "cinder-volume-2": 0.5}, Pipeline: "test-pipeline", + Options: scheduling.Options{SkipHistory: true}, } cinderRaw, err := json.Marshal(cinderRequest) @@ -281,7 +283,6 @@ func TestFilterWeigherPipelineController_ProcessNewDecisionFromAPI(t *testing.T) Spec: v1alpha1.PipelineSpec{ Type: v1alpha1.PipelineTypeFilterWeigher, SchedulingDomain: v1alpha1.SchedulingDomainCinder, - CreateHistory: true, Filters: []v1alpha1.FilterSpec{}, Weighers: []v1alpha1.WeigherSpec{}, }, @@ -315,7 +316,6 @@ func TestFilterWeigherPipelineController_ProcessNewDecisionFromAPI(t *testing.T) Spec: v1alpha1.PipelineSpec{ Type: v1alpha1.PipelineTypeFilterWeigher, SchedulingDomain: v1alpha1.SchedulingDomainCinder, - CreateHistory: false, Filters: []v1alpha1.FilterSpec{}, Weighers: []v1alpha1.WeigherSpec{}, }, @@ -369,14 +369,13 @@ func TestFilterWeigherPipelineController_ProcessNewDecisionFromAPI(t *testing.T) Spec: v1alpha1.PipelineSpec{ Type: v1alpha1.PipelineTypeFilterWeigher, SchedulingDomain: v1alpha1.SchedulingDomainCinder, - CreateHistory: true, Filters: []v1alpha1.FilterSpec{}, Weighers: []v1alpha1.WeigherSpec{}, }, }, createHistory: true, expectError: true, - expectHistoryCreated: true, + expectHistoryCreated: false, expectResult: false, }, } @@ -413,6 +412,16 @@ func TestFilterWeigherPipelineController_ProcessNewDecisionFromAPI(t *testing.T) controller.Pipelines[tt.pipelineConfig.Name] = initResult.Pipeline } + if tt.decision.Spec.CinderRaw != nil { + req := cinderRequest + req.Options = scheduling.Options{SkipHistory: !tt.createHistory} + raw, marshalErr := json.Marshal(req) + if marshalErr != nil { + t.Fatalf("Failed to marshal request with options: %v", marshalErr) + } + tt.decision.Spec.CinderRaw = &runtime.RawExtension{Raw: raw} + } + err := controller.ProcessNewDecisionFromAPI(context.Background(), tt.decision) if tt.expectError && err == nil { diff --git a/internal/scheduling/lib/filter_weigher_pipeline.go b/internal/scheduling/lib/filter_weigher_pipeline.go index ee769433d..d12b566ad 100644 --- a/internal/scheduling/lib/filter_weigher_pipeline.go +++ b/internal/scheduling/lib/filter_weigher_pipeline.go @@ -19,6 +19,7 @@ import ( type FilterWeigherPipeline[RequestType FilterWeigherPipelineRequest] interface { // Run the scheduling pipeline with the given request. + // Call-time options are read from request.GetOptions(). Run(request RequestType) (v1alpha1.DecisionResult, error) } @@ -263,6 +264,10 @@ func (s *filterWeigherPipeline[RequestType]) sortHostsByWeights(weights map[stri // Evaluate the pipeline and return a list of hosts in order of preference. func (p *filterWeigherPipeline[RequestType]) Run(request RequestType) (v1alpha1.DecisionResult, error) { + opts := request.GetOptions() + if err := opts.Validate(); err != nil { + return v1alpha1.DecisionResult{}, err + } slogArgs := request.GetTraceLogArgs() slogArgsAny := make([]any, 0, len(slogArgs)) for _, arg := range slogArgs { @@ -297,6 +302,21 @@ func (p *filterWeigherPipeline[RequestType]) Run(request RequestType) (v1alpha1. hosts := p.sortHostsByWeights(outWeights) traceLog.Info("scheduler: sorted hosts", "hosts", hosts) + if opts.MaxCandidates > 0 && len(hosts) > opts.MaxCandidates { + traceLog.Info("scheduler: trimming candidate list", "maxCandidates", opts.MaxCandidates, "before", len(hosts)) + hosts = hosts[:opts.MaxCandidates] + // Drop trimmed hosts from outWeights so AggregatedOutWeights stays consistent. + kept := make(map[string]struct{}, len(hosts)) + for _, h := range hosts { + kept[h] = struct{}{} + } + for host := range outWeights { + if _, ok := kept[host]; !ok { + delete(outWeights, host) + } + } + } + // Collect some metrics about the pipeline execution. go p.monitor.observePipelineResult(request, hosts) diff --git a/internal/scheduling/lib/filter_weigher_pipeline_request.go b/internal/scheduling/lib/filter_weigher_pipeline_request.go index 26688c358..f66431545 100644 --- a/internal/scheduling/lib/filter_weigher_pipeline_request.go +++ b/internal/scheduling/lib/filter_weigher_pipeline_request.go @@ -3,7 +3,11 @@ package lib -import "log/slog" +import ( + "log/slog" + + "github.com/cobaltcore-dev/cortex/api/scheduling" +) type FilterWeigherPipelineRequest interface { // Get the hosts that went in the pipeline. @@ -21,4 +25,6 @@ type FilterWeigherPipelineRequest interface { // Get logging args to be used in the step's trace log. // Usually, this will be the request context including the request ID. GetTraceLogArgs() []slog.Attr + // Get the call-time options for this pipeline run. + GetOptions() scheduling.Options } diff --git a/internal/scheduling/lib/filter_weigher_pipeline_request_test.go b/internal/scheduling/lib/filter_weigher_pipeline_request_test.go index 87ab0d786..3b2b6d246 100644 --- a/internal/scheduling/lib/filter_weigher_pipeline_request_test.go +++ b/internal/scheduling/lib/filter_weigher_pipeline_request_test.go @@ -3,7 +3,11 @@ package lib -import "log/slog" +import ( + "log/slog" + + "github.com/cobaltcore-dev/cortex/api/scheduling" +) type mockFilterWeigherPipelineRequest struct { WeightKeys []string @@ -11,6 +15,7 @@ type mockFilterWeigherPipelineRequest struct { Hosts []string Weights map[string]float64 Pipeline string + Options scheduling.Options } func (m mockFilterWeigherPipelineRequest) GetWeightKeys() []string { return m.WeightKeys } @@ -18,6 +23,7 @@ func (m mockFilterWeigherPipelineRequest) GetTraceLogArgs() []slog.Attr { retu func (m mockFilterWeigherPipelineRequest) GetHosts() []string { return m.Hosts } func (m mockFilterWeigherPipelineRequest) GetWeights() map[string]float64 { return m.Weights } func (m mockFilterWeigherPipelineRequest) GetPipeline() string { return m.Pipeline } +func (m mockFilterWeigherPipelineRequest) GetOptions() scheduling.Options { return m.Options } func (m mockFilterWeigherPipelineRequest) Filter(hosts map[string]float64) FilterWeigherPipelineRequest { filteredHosts := make([]string, 0, len(hosts)) diff --git a/internal/scheduling/lib/filter_weigher_pipeline_step.go b/internal/scheduling/lib/filter_weigher_pipeline_step.go index 26dc5de40..54816519c 100644 --- a/internal/scheduling/lib/filter_weigher_pipeline_step.go +++ b/internal/scheduling/lib/filter_weigher_pipeline_step.go @@ -30,6 +30,8 @@ type FilterWeigherPipelineStep[RequestType FilterWeigherPipelineRequest] interfa // // A traceLog is provided that contains the global request id and should // be used to log the step's execution. + // + // Per-call options are available via request.GetOptions(). Run(traceLog *slog.Logger, request RequestType) (*FilterWeigherPipelineStepResult, error) } diff --git a/internal/scheduling/lib/filter_weigher_pipeline_test.go b/internal/scheduling/lib/filter_weigher_pipeline_test.go index 0e2775944..3a2012db7 100644 --- a/internal/scheduling/lib/filter_weigher_pipeline_test.go +++ b/internal/scheduling/lib/filter_weigher_pipeline_test.go @@ -7,8 +7,10 @@ import ( "context" "log/slog" "math" + "slices" "testing" + "github.com/cobaltcore-dev/cortex/api/scheduling" "github.com/cobaltcore-dev/cortex/api/v1alpha1" "k8s.io/apimachinery/pkg/runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -372,3 +374,54 @@ func TestFilterWeigherPipelineMonitor_SubPipeline(t *testing.T) { t.Error("original monitor should not be modified") } } + +func TestPipeline_MaxCandidates(t *testing.T) { + // Pipeline that passes all 4 hosts with descending weights. + pipeline := &filterWeigherPipeline[mockFilterWeigherPipelineRequest]{ + filters: map[string]Filter[mockFilterWeigherPipelineRequest]{}, + filtersOrder: []string{}, + weighersOrder: []string{}, + weighers: map[string]Weigher[mockFilterWeigherPipelineRequest]{}, + } + request := mockFilterWeigherPipelineRequest{ + Hosts: []string{"host1", "host2", "host3", "host4"}, + Weights: map[string]float64{"host1": 4.0, "host2": 3.0, "host3": 2.0, "host4": 1.0}, + } + + tests := []struct { + name string + maxCandidates int + wantLen int + wantFirst string + }{ + {"no limit", 0, 4, "host1"}, + {"limit to 2", 2, 2, "host1"}, + {"limit to 1", 1, 1, "host1"}, + {"limit larger than hosts", 10, 4, "host1"}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + req := request + req.Options = scheduling.Options{MaxCandidates: tt.maxCandidates} + result, err := pipeline.Run(req) + if err != nil { + t.Fatalf("expected no error, got %v", err) + } + if len(result.OrderedHosts) != tt.wantLen { + t.Errorf("expected %d hosts, got %d: %v", tt.wantLen, len(result.OrderedHosts), result.OrderedHosts) + } + if len(result.OrderedHosts) > 0 && result.OrderedHosts[0] != tt.wantFirst { + t.Errorf("expected first host %s, got %s", tt.wantFirst, result.OrderedHosts[0]) + } + if tt.maxCandidates > 0 && len(result.OrderedHosts) <= tt.maxCandidates { + // AggregatedOutWeights must only contain returned hosts. + for host := range result.AggregatedOutWeights { + if !slices.Contains(result.OrderedHosts, host) { + t.Errorf("AggregatedOutWeights contains trimmed host %s", host) + } + } + } + }) + } +} diff --git a/internal/scheduling/machines/filter_weigher_pipeline_controller.go b/internal/scheduling/machines/filter_weigher_pipeline_controller.go index 35d51708a..0063010d8 100644 --- a/internal/scheduling/machines/filter_weigher_pipeline_controller.go +++ b/internal/scheduling/machines/filter_weigher_pipeline_controller.go @@ -6,7 +6,6 @@ package machines import ( "context" "errors" - "fmt" "sync" "time" @@ -95,10 +94,6 @@ func (c *FilterWeigherPipelineController) ProcessNewMachine(ctx context.Context, }, } - pipelineConf, ok := c.PipelineConfigs[decision.Spec.PipelineRef.Name] - if !ok { - return fmt.Errorf("pipeline %s not configured", decision.Spec.PipelineRef.Name) - } err := c.process(ctx, decision) if err != nil { meta.SetStatusCondition(&decision.Status.Conditions, metav1.Condition{ @@ -115,11 +110,6 @@ func (c *FilterWeigherPipelineController) ProcessNewMachine(ctx context.Context, Message: "pipeline run succeeded", }) } - if pipelineConf.Spec.CreateHistory { - if upsertErr := c.HistoryManager.CreateOrUpdateHistory(ctx, decision, nil, err); upsertErr != nil { - ctrl.LoggerFrom(ctx).Error(upsertErr, "failed to create/update history") - } - } return err } @@ -142,9 +132,14 @@ func (c *FilterWeigherPipelineController) process(ctx context.Context, decision return errors.New("no machine pools available for scheduling") } - // Execute the scheduling pipeline. + // Execute the scheduling pipeline. Options not set: machine scheduling always records history. request := ironcore.MachinePipelineRequest{Pools: pools.Items} result, err := pipeline.Run(request) + if !request.Options.SkipHistory { + if upsertErr := c.HistoryManager.CreateOrUpdateHistory(ctx, decision, nil, err); upsertErr != nil { + log.Error(upsertErr, "failed to create/update history") + } + } if err != nil { log.V(1).Error(err, "failed to run scheduler pipeline") return errors.New("failed to run scheduler pipeline") diff --git a/internal/scheduling/machines/filter_weigher_pipeline_controller_test.go b/internal/scheduling/machines/filter_weigher_pipeline_controller_test.go index bc2e0722a..9431f79ed 100644 --- a/internal/scheduling/machines/filter_weigher_pipeline_controller_test.go +++ b/internal/scheduling/machines/filter_weigher_pipeline_controller_test.go @@ -125,6 +125,7 @@ func TestFilterWeigherPipelineController_Reconcile(t *testing.T) { Pipelines: map[string]lib.FilterWeigherPipeline[ironcore.MachinePipelineRequest]{ "machines-scheduler": createMockPipeline(), }, + HistoryManager: lib.HistoryClient{Client: client}, }, Monitor: lib.FilterWeigherPipelineMonitor{}, } @@ -322,7 +323,6 @@ func TestFilterWeigherPipelineController_ProcessNewMachine(t *testing.T) { Spec: v1alpha1.PipelineSpec{ Type: v1alpha1.PipelineTypeFilterWeigher, SchedulingDomain: v1alpha1.SchedulingDomainMachines, - CreateHistory: true, Filters: []v1alpha1.FilterSpec{}, Weighers: []v1alpha1.WeigherSpec{}, }, @@ -356,14 +356,13 @@ func TestFilterWeigherPipelineController_ProcessNewMachine(t *testing.T) { Spec: v1alpha1.PipelineSpec{ Type: v1alpha1.PipelineTypeFilterWeigher, SchedulingDomain: v1alpha1.SchedulingDomainMachines, - CreateHistory: false, Filters: []v1alpha1.FilterSpec{}, Weighers: []v1alpha1.WeigherSpec{}, }, }, createHistory: false, expectError: false, - expectHistoryCreated: false, + expectHistoryCreated: true, expectMachinePoolAssigned: true, expectTargetHost: "pool1", }, @@ -403,14 +402,13 @@ func TestFilterWeigherPipelineController_ProcessNewMachine(t *testing.T) { Spec: v1alpha1.PipelineSpec{ Type: v1alpha1.PipelineTypeFilterWeigher, SchedulingDomain: v1alpha1.SchedulingDomainMachines, - CreateHistory: true, Filters: []v1alpha1.FilterSpec{}, Weighers: []v1alpha1.WeigherSpec{}, }, }, createHistory: true, expectError: true, - expectHistoryCreated: true, // Decision is created but processing fails + expectHistoryCreated: false, expectMachinePoolAssigned: false, }, } diff --git a/internal/scheduling/manila/filter_weigher_pipeline_controller.go b/internal/scheduling/manila/filter_weigher_pipeline_controller.go index 128b7d719..6e00593a9 100644 --- a/internal/scheduling/manila/filter_weigher_pipeline_controller.go +++ b/internal/scheduling/manila/filter_weigher_pipeline_controller.go @@ -7,7 +7,6 @@ import ( "context" "encoding/json" "errors" - "fmt" "sync" "time" @@ -74,10 +73,6 @@ func (c *FilterWeigherPipelineController) ProcessNewDecisionFromAPI(ctx context. c.processMu.Lock() defer c.processMu.Unlock() - pipelineConf, ok := c.PipelineConfigs[decision.Spec.PipelineRef.Name] - if !ok { - return fmt.Errorf("pipeline %s not configured", decision.Spec.PipelineRef.Name) - } err := c.process(ctx, decision) if err != nil { meta.SetStatusCondition(&decision.Status.Conditions, metav1.Condition{ @@ -94,11 +89,6 @@ func (c *FilterWeigherPipelineController) ProcessNewDecisionFromAPI(ctx context. Message: "pipeline run succeeded", }) } - if pipelineConf.Spec.CreateHistory { - if upsertErr := c.HistoryManager.CreateOrUpdateHistory(ctx, decision, nil, err); upsertErr != nil { - ctrl.LoggerFrom(ctx).Error(upsertErr, "failed to create/update history") - } - } return err } @@ -122,6 +112,11 @@ func (c *FilterWeigherPipelineController) process(ctx context.Context, decision } result, err := pipeline.Run(request) + if !request.Options.SkipHistory { + if upsertErr := c.HistoryManager.CreateOrUpdateHistory(ctx, decision, nil, err); upsertErr != nil { + log.Error(upsertErr, "failed to create/update history") + } + } if err != nil { log.Error(err, "failed to run pipeline") return err diff --git a/internal/scheduling/manila/filter_weigher_pipeline_controller_test.go b/internal/scheduling/manila/filter_weigher_pipeline_controller_test.go index 3ca4fd7f1..11292d3f5 100644 --- a/internal/scheduling/manila/filter_weigher_pipeline_controller_test.go +++ b/internal/scheduling/manila/filter_weigher_pipeline_controller_test.go @@ -17,6 +17,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client/fake" api "github.com/cobaltcore-dev/cortex/api/external/manila" + "github.com/cobaltcore-dev/cortex/api/scheduling" "github.com/cobaltcore-dev/cortex/api/v1alpha1" "github.com/sapcc/go-bits/must" @@ -48,6 +49,7 @@ func TestFilterWeigherPipelineController_Reconcile(t *testing.T) { }, Weights: map[string]float64{"manila-share-1@backend1": 1.0, "manila-share-2@backend2": 0.5}, Pipeline: "test-pipeline", + Options: scheduling.Options{SkipHistory: true}, } manilaRaw, err := json.Marshal(manilaRequest) @@ -278,7 +280,6 @@ func TestFilterWeigherPipelineController_ProcessNewDecisionFromAPI(t *testing.T) Spec: v1alpha1.PipelineSpec{ Type: v1alpha1.PipelineTypeFilterWeigher, SchedulingDomain: v1alpha1.SchedulingDomainManila, - CreateHistory: true, Filters: []v1alpha1.FilterSpec{}, Weighers: []v1alpha1.WeigherSpec{}, }, @@ -312,7 +313,6 @@ func TestFilterWeigherPipelineController_ProcessNewDecisionFromAPI(t *testing.T) Spec: v1alpha1.PipelineSpec{ Type: v1alpha1.PipelineTypeFilterWeigher, SchedulingDomain: v1alpha1.SchedulingDomainManila, - CreateHistory: false, Filters: []v1alpha1.FilterSpec{}, Weighers: []v1alpha1.WeigherSpec{}, }, @@ -366,14 +366,13 @@ func TestFilterWeigherPipelineController_ProcessNewDecisionFromAPI(t *testing.T) Spec: v1alpha1.PipelineSpec{ Type: v1alpha1.PipelineTypeFilterWeigher, SchedulingDomain: v1alpha1.SchedulingDomainManila, - CreateHistory: true, Filters: []v1alpha1.FilterSpec{}, Weighers: []v1alpha1.WeigherSpec{}, }, }, createHistory: true, expectError: true, - expectHistoryCreated: true, + expectHistoryCreated: false, expectResult: false, }, } @@ -410,11 +409,17 @@ func TestFilterWeigherPipelineController_ProcessNewDecisionFromAPI(t *testing.T) controller.Pipelines[tt.pipelineConfig.Name] = initResult.Pipeline } - err := controller.ProcessNewDecisionFromAPI(context.Background(), tt.decision) - - if tt.expectError && err == nil { - t.Error("Expected error but got none") + if tt.decision.Spec.ManilaRaw != nil { + req := manilaRequest + req.Options = scheduling.Options{SkipHistory: !tt.createHistory} + raw, marshalErr := json.Marshal(req) + if marshalErr != nil { + t.Fatalf("Failed to marshal request with options: %v", marshalErr) + } + tt.decision.Spec.ManilaRaw = &runtime.RawExtension{Raw: raw} } + + err := controller.ProcessNewDecisionFromAPI(context.Background(), tt.decision) if !tt.expectError && err != nil { t.Errorf("Expected no error but got: %v", err) } diff --git a/internal/scheduling/nova/filter_weigher_pipeline_controller.go b/internal/scheduling/nova/filter_weigher_pipeline_controller.go index 279ac1c3e..7be2b5dee 100644 --- a/internal/scheduling/nova/filter_weigher_pipeline_controller.go +++ b/internal/scheduling/nova/filter_weigher_pipeline_controller.go @@ -7,7 +7,6 @@ import ( "context" "encoding/json" "errors" - "fmt" "sync" "time" @@ -38,8 +37,9 @@ type FilterWeigherPipelineController struct { // Toolbox shared between all pipeline controllers. lib.BasePipelineController[lib.FilterWeigherPipeline[api.ExternalSchedulerRequest]] - // Mutex to only allow one process at a time - processMu sync.Mutex + // Mutex to only allow one process at a time. + // Read-only runs (opts.ReadOnly == true) acquire a read lock; write runs acquire the full lock. + processMu sync.RWMutex // Monitor to pass down to all pipelines. Monitor lib.FilterWeigherPipelineMonitor @@ -54,13 +54,23 @@ func (c *FilterWeigherPipelineController) PipelineType() v1alpha1.PipelineType { // Callback executed when kubernetes asks to reconcile a decision resource. func (c *FilterWeigherPipelineController) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { - c.processMu.Lock() - defer c.processMu.Unlock() - + // Peek at the decision before acquiring the lock so we can choose the right lock type. + // Read-only runs can proceed concurrently; write runs need the exclusive lock. decision := &v1alpha1.Decision{} if err := c.Get(ctx, req.NamespacedName, decision); err != nil { return ctrl.Result{}, client.IgnoreNotFound(err) } + if c.peekReadOnly(decision) { + c.processMu.RLock() + defer c.processMu.RUnlock() + } else { + c.processMu.Lock() + defer c.processMu.Unlock() + // Re-fetch after acquiring the exclusive lock to see consistent state. + if err := c.Get(ctx, req.NamespacedName, decision); err != nil { + return ctrl.Result{}, client.IgnoreNotFound(err) + } + } old := decision.DeepCopy() if err := c.process(ctx, decision); err != nil { return ctrl.Result{}, err @@ -74,13 +84,16 @@ func (c *FilterWeigherPipelineController) Reconcile(ctx context.Context, req ctr // Process the decision from the API. Should create and return the updated decision. func (c *FilterWeigherPipelineController) ProcessNewDecisionFromAPI(ctx context.Context, decision *v1alpha1.Decision) error { - c.processMu.Lock() - defer c.processMu.Unlock() - - pipelineConf, ok := c.PipelineConfigs[decision.Spec.PipelineRef.Name] - if !ok { - return fmt.Errorf("pipeline %s not configured", decision.Spec.PipelineRef.Name) + // Read-only runs share the cached decision state; no re-fetch needed because they + // don't observe writes from concurrent exclusive-lock runs. + if c.peekReadOnly(decision) { + c.processMu.RLock() + defer c.processMu.RUnlock() + } else { + c.processMu.Lock() + defer c.processMu.Unlock() } + err := c.process(ctx, decision) if err != nil { meta.SetStatusCondition(&decision.Status.Conditions, metav1.Condition{ @@ -97,9 +110,6 @@ func (c *FilterWeigherPipelineController) ProcessNewDecisionFromAPI(ctx context. Message: "pipeline run succeeded", }) } - if pipelineConf.Spec.CreateHistory { - c.upsertHistory(ctx, decision, err) - } return err } @@ -167,6 +177,9 @@ func (c *FilterWeigherPipelineController) process(ctx context.Context, decision } result, err := pipeline.Run(request) + if !request.Options.SkipHistory { + c.upsertHistory(ctx, decision, err) + } if err != nil { log.Error(err, "failed to run pipeline") return err @@ -182,7 +195,19 @@ func (c *FilterWeigherPipelineController) process(ctx context.Context, decision return nil } -// The base controller will delegate the pipeline creation down to this method. +// peekReadOnly determines whether a decision should use a read lock instead of +// the exclusive write lock. Defaults to false (exclusive) on any parse error. +func (c *FilterWeigherPipelineController) peekReadOnly(decision *v1alpha1.Decision) bool { + if decision.Spec.NovaRaw == nil { + return false + } + var request api.ExternalSchedulerRequest + if err := json.Unmarshal(decision.Spec.NovaRaw.Raw, &request); err != nil { + return false + } + return request.Options.ReadOnly +} + func (c *FilterWeigherPipelineController) InitPipeline( ctx context.Context, p v1alpha1.Pipeline, diff --git a/internal/scheduling/nova/filter_weigher_pipeline_controller_test.go b/internal/scheduling/nova/filter_weigher_pipeline_controller_test.go index 752725df8..52064d0c3 100644 --- a/internal/scheduling/nova/filter_weigher_pipeline_controller_test.go +++ b/internal/scheduling/nova/filter_weigher_pipeline_controller_test.go @@ -20,6 +20,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client/fake" api "github.com/cobaltcore-dev/cortex/api/external/nova" + "github.com/cobaltcore-dev/cortex/api/scheduling" "github.com/cobaltcore-dev/cortex/api/v1alpha1" "github.com/cobaltcore-dev/cortex/internal/scheduling/lib" @@ -77,6 +78,7 @@ func TestFilterWeigherPipelineController_Reconcile(t *testing.T) { }, Weights: map[string]float64{"compute-1": 1.0, "compute-2": 0.5}, Pipeline: "test-pipeline", + Options: scheduling.Options{SkipHistory: true}, } novaRaw, err := json.Marshal(novaRequest) @@ -431,7 +433,6 @@ func TestFilterWeigherPipelineController_ProcessNewDecisionFromAPI(t *testing.T) Spec: v1alpha1.PipelineSpec{ Type: v1alpha1.PipelineTypeFilterWeigher, SchedulingDomain: v1alpha1.SchedulingDomainNova, - CreateHistory: true, Filters: []v1alpha1.FilterSpec{}, Weighers: []v1alpha1.WeigherSpec{}, }, @@ -443,7 +444,6 @@ func TestFilterWeigherPipelineController_ProcessNewDecisionFromAPI(t *testing.T) Spec: v1alpha1.PipelineSpec{ Type: v1alpha1.PipelineTypeFilterWeigher, SchedulingDomain: v1alpha1.SchedulingDomainNova, - CreateHistory: true, Filters: []v1alpha1.FilterSpec{}, Weighers: []v1alpha1.WeigherSpec{}, }, @@ -480,7 +480,6 @@ func TestFilterWeigherPipelineController_ProcessNewDecisionFromAPI(t *testing.T) Spec: v1alpha1.PipelineSpec{ Type: v1alpha1.PipelineTypeFilterWeigher, SchedulingDomain: v1alpha1.SchedulingDomainNova, - CreateHistory: false, Filters: []v1alpha1.FilterSpec{}, Weighers: []v1alpha1.WeigherSpec{}, }, @@ -492,7 +491,6 @@ func TestFilterWeigherPipelineController_ProcessNewDecisionFromAPI(t *testing.T) Spec: v1alpha1.PipelineSpec{ Type: v1alpha1.PipelineTypeFilterWeigher, SchedulingDomain: v1alpha1.SchedulingDomainNova, - CreateHistory: false, Filters: []v1alpha1.FilterSpec{}, Weighers: []v1alpha1.WeigherSpec{}, }, @@ -528,7 +526,7 @@ func TestFilterWeigherPipelineController_ProcessNewDecisionFromAPI(t *testing.T) expectResult: false, expectHistoryCreated: false, expectUpdatedStatus: false, - errorContains: "pipeline nonexistent-pipeline not configured", + errorContains: "pipeline not found or not ready", }, { name: "decision without novaRaw spec", @@ -552,7 +550,6 @@ func TestFilterWeigherPipelineController_ProcessNewDecisionFromAPI(t *testing.T) Spec: v1alpha1.PipelineSpec{ Type: v1alpha1.PipelineTypeFilterWeigher, SchedulingDomain: v1alpha1.SchedulingDomainNova, - CreateHistory: true, Filters: []v1alpha1.FilterSpec{}, Weighers: []v1alpha1.WeigherSpec{}, }, @@ -564,7 +561,6 @@ func TestFilterWeigherPipelineController_ProcessNewDecisionFromAPI(t *testing.T) Spec: v1alpha1.PipelineSpec{ Type: v1alpha1.PipelineTypeFilterWeigher, SchedulingDomain: v1alpha1.SchedulingDomainNova, - CreateHistory: true, Filters: []v1alpha1.FilterSpec{}, Weighers: []v1alpha1.WeigherSpec{}, }, @@ -573,7 +569,7 @@ func TestFilterWeigherPipelineController_ProcessNewDecisionFromAPI(t *testing.T) createHistory: true, expectError: true, expectResult: false, - expectHistoryCreated: true, + expectHistoryCreated: false, expectUpdatedStatus: false, errorContains: "no novaRaw spec defined", }, @@ -602,7 +598,6 @@ func TestFilterWeigherPipelineController_ProcessNewDecisionFromAPI(t *testing.T) Spec: v1alpha1.PipelineSpec{ Type: v1alpha1.PipelineTypeFilterWeigher, SchedulingDomain: v1alpha1.SchedulingDomainNova, - CreateHistory: true, Filters: []v1alpha1.FilterSpec{}, Weighers: []v1alpha1.WeigherSpec{}, }, @@ -611,7 +606,7 @@ func TestFilterWeigherPipelineController_ProcessNewDecisionFromAPI(t *testing.T) createHistory: true, expectError: true, expectResult: false, - expectHistoryCreated: true, + expectHistoryCreated: false, expectUpdatedStatus: false, errorContains: "pipeline not found or not ready", }, @@ -640,7 +635,6 @@ func TestFilterWeigherPipelineController_ProcessNewDecisionFromAPI(t *testing.T) Spec: v1alpha1.PipelineSpec{ Type: v1alpha1.PipelineTypeFilterWeigher, SchedulingDomain: v1alpha1.SchedulingDomainNova, - CreateHistory: true, Filters: []v1alpha1.FilterSpec{}, Weighers: []v1alpha1.WeigherSpec{}, }, @@ -649,7 +643,7 @@ func TestFilterWeigherPipelineController_ProcessNewDecisionFromAPI(t *testing.T) createHistory: true, expectError: true, expectResult: false, - expectHistoryCreated: true, + expectHistoryCreated: false, expectUpdatedStatus: false, errorContains: "pipeline not found or not ready", }, @@ -697,6 +691,16 @@ func TestFilterWeigherPipelineController_ProcessNewDecisionFromAPI(t *testing.T) controller.Pipelines[tt.pipeline.Name] = initResult.Pipeline } + if tt.decision.Spec.NovaRaw != nil { + req := novaRequest + req.Options = scheduling.Options{SkipHistory: !tt.createHistory} + raw, marshalErr := json.Marshal(req) + if marshalErr != nil { + t.Fatalf("Failed to marshal request with options: %v", marshalErr) + } + tt.decision.Spec.NovaRaw = &runtime.RawExtension{Raw: raw} + } + // Call the method under test err := controller.ProcessNewDecisionFromAPI(context.Background(), tt.decision) @@ -779,6 +783,7 @@ func TestFilterWeigherPipelineController_IgnorePreselection(t *testing.T) { }, Weights: map[string]float64{"original-host-1": 1.0, "original-host-2": 0.5}, Pipeline: "test-pipeline", + Options: scheduling.Options{SkipHistory: true}, } novaRaw, err := json.Marshal(novaRequest) @@ -864,7 +869,6 @@ func TestFilterWeigherPipelineController_IgnorePreselection(t *testing.T) { Spec: v1alpha1.PipelineSpec{ Type: v1alpha1.PipelineTypeFilterWeigher, SchedulingDomain: v1alpha1.SchedulingDomainNova, - CreateHistory: false, IgnorePreselection: tt.ignorePreselection, Filters: []v1alpha1.FilterSpec{}, Weighers: []v1alpha1.WeigherSpec{}, @@ -928,3 +932,74 @@ func TestFilterWeigherPipelineController_IgnorePreselection(t *testing.T) { // Error variable for testing var errGathererFailed = errors.New("gatherer failed") + +func TestFilterWeigherPipelineController_PeekReadOnly(t *testing.T) { + makeRaw := func(readOnly bool) []byte { + r := api.ExternalSchedulerRequest{ + Spec: api.NovaObject[api.NovaSpec]{Data: api.NovaSpec{NumInstances: 1}}, + Options: scheduling.Options{ReadOnly: readOnly}, + } + raw, err := json.Marshal(r) + if err != nil { + panic(err) + } + return raw + } + + c := &FilterWeigherPipelineController{} + + tests := []struct { + name string + decision *v1alpha1.Decision + want bool + }{ + { + name: "nil NovaRaw defaults to exclusive lock", + decision: &v1alpha1.Decision{ + Spec: v1alpha1.DecisionSpec{ + PipelineRef: corev1.ObjectReference{Name: "test-pipeline"}, + }, + }, + want: false, + }, + { + name: "invalid JSON defaults to exclusive lock", + decision: &v1alpha1.Decision{ + Spec: v1alpha1.DecisionSpec{ + PipelineRef: corev1.ObjectReference{Name: "test-pipeline"}, + NovaRaw: &runtime.RawExtension{Raw: []byte("not-json")}, + }, + }, + want: false, + }, + { + name: "ReadOnly=false uses exclusive lock", + decision: &v1alpha1.Decision{ + Spec: v1alpha1.DecisionSpec{ + PipelineRef: corev1.ObjectReference{Name: "test-pipeline"}, + NovaRaw: &runtime.RawExtension{Raw: makeRaw(false)}, + }, + }, + want: false, + }, + { + name: "ReadOnly=true uses read lock", + decision: &v1alpha1.Decision{ + Spec: v1alpha1.DecisionSpec{ + PipelineRef: corev1.ObjectReference{Name: "test-pipeline"}, + NovaRaw: &runtime.RawExtension{Raw: makeRaw(true)}, + }, + }, + want: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := c.peekReadOnly(tt.decision) + if got != tt.want { + t.Errorf("expected peekReadOnly = %v, got %v", tt.want, got) + } + }) + } +} diff --git a/internal/scheduling/nova/integration_test.go b/internal/scheduling/nova/integration_test.go index f4bb38a79..1b84601a7 100644 --- a/internal/scheduling/nova/integration_test.go +++ b/internal/scheduling/nova/integration_test.go @@ -252,6 +252,7 @@ func NewIntegrationTestServer(t *testing.T, pipelineConfig PipelineConfig, objec Client: k8sClient, Pipelines: make(map[string]lib.FilterWeigherPipeline[novaapi.ExternalSchedulerRequest]), PipelineConfigs: make(map[string]v1alpha1.Pipeline), + HistoryManager: lib.HistoryClient{Client: k8sClient}, }, Monitor: getSharedMonitor(), } diff --git a/internal/scheduling/nova/plugins/filters/filter_has_enough_capacity.go b/internal/scheduling/nova/plugins/filters/filter_has_enough_capacity.go index b97d3e0e5..b379bf3e4 100644 --- a/internal/scheduling/nova/plugins/filters/filter_has_enough_capacity.go +++ b/internal/scheduling/nova/plugins/filters/filter_has_enough_capacity.go @@ -62,6 +62,7 @@ type FilterHasEnoughCapacity struct { // // Please also note that disk space is currently not considered by this filter. func (s *FilterHasEnoughCapacity) Run(traceLog *slog.Logger, request api.ExternalSchedulerRequest) (*lib.FilterWeigherPipelineStepResult, error) { + opts := request.GetOptions() result := s.IncludeAllHostsFromRequest(request) // This map holds the free resources per host. @@ -85,7 +86,7 @@ func (s *FilterHasEnoughCapacity) Run(traceLog *slog.Logger, request api.Externa } // Subtract allocated resources (skip when ignoring allocations for empty-datacenter capacity queries). - if !s.Options.IgnoreAllocations { + if !s.Options.IgnoreAllocations && !opts.AssumeEmptyHosts { for resourceName, allocated := range hv.Status.Allocation { free, ok := freeResourcesByHost[hv.Name][resourceName] if !ok { @@ -112,7 +113,8 @@ func (s *FilterHasEnoughCapacity) Run(traceLog *slog.Logger, request api.Externa } // Check if this reservation type should be ignored - if slices.Contains(s.Options.IgnoredReservationTypes, reservation.Spec.Type) { + if slices.Contains(s.Options.IgnoredReservationTypes, reservation.Spec.Type) || + slices.Contains(opts.IgnoredReservationTypes, reservation.Spec.Type) { traceLog.Debug("ignoring reservation type", "type", reservation.Spec.Type, "reservation", reservation.Name) continue } @@ -128,18 +130,14 @@ func (s *FilterHasEnoughCapacity) Run(traceLog *slog.Logger, request api.Externa // Check if this is a CR reservation scheduling request. // If so, we should NOT unlock any CR reservations to prevent overbooking. // CR capacity should only be unlocked for actual VM scheduling. - intent, err := request.GetIntent() switch { - case err == nil && intent == api.ReserveForCommittedResourceIntent: - traceLog.Debug("keeping CR reservation locked for CR reservation scheduling", + case opts.LockReservations || s.Options.LockReserved: + traceLog.Debug("keeping CR reservation locked", "reservation", reservation.Name, - "intent", intent) + "lockReservations", opts.LockReservations, + "lockReserved", s.Options.LockReserved) // Don't continue - fall through to block the resources - case !s.Options.LockReserved && - // For committed resource reservations: unlock resources only if: - // 1. Project ID matches - // 2. ResourceGroup matches the flavor's hw_version - reservation.Spec.CommittedResourceReservation.ProjectID == request.Spec.Data.ProjectID && + case reservation.Spec.CommittedResourceReservation.ProjectID == request.Spec.Data.ProjectID && reservation.Spec.CommittedResourceReservation.ResourceGroup == request.Spec.Data.Flavor.Data.ExtraSpecs["hw_version"]: traceLog.Info("unlocking resources reserved by matching committed resource reservation with allocation", "reservation", reservation.Name, @@ -199,7 +197,7 @@ func (s *FilterHasEnoughCapacity) Run(traceLog *slog.Logger, request api.Externa // When ignoring allocations (empty-datacenter scenario) VM resources are not // deducted, so the confirmed-VM adjustment would under-block: always use the // full slot instead. - !s.Options.IgnoreAllocations && + !s.Options.IgnoreAllocations && !opts.AssumeEmptyHosts && // if the reservation is not being migrated, block only unused resources reservation.Spec.TargetHost == reservation.Status.Host && reservation.Spec.CommittedResourceReservation != nil && diff --git a/internal/scheduling/nova/plugins/filters/filter_has_enough_capacity_test.go b/internal/scheduling/nova/plugins/filters/filter_has_enough_capacity_test.go index 5b026408f..1cdc800f0 100644 --- a/internal/scheduling/nova/plugins/filters/filter_has_enough_capacity_test.go +++ b/internal/scheduling/nova/plugins/filters/filter_has_enough_capacity_test.go @@ -4,6 +4,7 @@ package filters import ( + "github.com/cobaltcore-dev/cortex/api/scheduling" "log/slog" "testing" @@ -807,6 +808,44 @@ func TestFilterHasEnoughCapacity_IgnoredReservationTypes(t *testing.T) { } } +func TestFilterHasEnoughCapacity_IgnoredReservationTypes_CallTime(t *testing.T) { + scheme := buildTestScheme(t) + + // Same two-host setup as the YAML-path test: CR on host1, Failover on host2. + // Each blocks 4 CPU, leaving 4 free; request needs 8 CPU so both hosts fail without ignoring. + hypervisors := []*hv1.Hypervisor{ + newHypervisor("host1", "16", "8", "32Gi", "16Gi"), + newHypervisor("host2", "16", "8", "32Gi", "16Gi"), + } + reservations := []*v1alpha1.Reservation{ + newCommittedReservation("cr-res", "host1", "project-X", "m1.large", "gp-1", "4", "8Gi", nil, nil), + newFailoverReservation("failover-res", "host2", "4", "8Gi", map[string]string{"other-vm": "host3"}), + } + request := newNovaRequest("instance-123", "project-A", "m1.large", "gp-1", 8, "16Gi", false, []string{"host1", "host2"}) + + objects := make([]client.Object, 0, len(hypervisors)+len(reservations)) + for _, h := range hypervisors { + objects = append(objects, h.DeepCopy()) + } + for _, r := range reservations { + objects = append(objects, r.DeepCopy()) + } + + step := &FilterHasEnoughCapacity{} + step.Client = fake.NewClientBuilder().WithScheme(scheme).WithObjects(objects...).Build() + step.Options = FilterHasEnoughCapacityOpts{LockReserved: true} // no YAML-level ignores + + // Call-time: ignore CR reservations → host1 passes, host2 still blocked by failover. + request.Options = scheduling.Options{ + IgnoredReservationTypes: []v1alpha1.ReservationType{v1alpha1.ReservationTypeCommittedResource}, + } + result, err := step.Run(slog.Default(), request) + if err != nil { + t.Fatalf("expected no error, got %v", err) + } + assertActivations(t, result.Activations, []string{"host1"}, []string{"host2"}) +} + func TestFilterHasEnoughCapacity_ReserveForCommittedResourceIntent(t *testing.T) { scheme := buildTestScheme(t) @@ -819,6 +858,7 @@ func TestFilterHasEnoughCapacity_ReserveForCommittedResourceIntent(t *testing.T) reservations []*v1alpha1.Reservation request api.ExternalSchedulerRequest opts FilterHasEnoughCapacityOpts + pipelineOpts scheduling.Options expectedHosts []string filteredHosts []string }{ @@ -834,8 +874,9 @@ func TestFilterHasEnoughCapacity_ReserveForCommittedResourceIntent(t *testing.T) }, // Request with reserve_for_committed_resource intent (scheduling a new CR reservation) request: newNovaRequestWithIntent("new-reservation-uuid", "project-A", "m1.large", "gp-1", 4, "8Gi", "reserve_for_committed_resource", false, []string{"host1", "host2"}), - opts: FilterHasEnoughCapacityOpts{LockReserved: false}, // Note: LockReserved is false, but intent overrides - expectedHosts: []string{"host2"}, // host1 blocked because existing-cr stays locked + opts: FilterHasEnoughCapacityOpts{LockReserved: false}, + pipelineOpts: scheduling.Options{LockReservations: true}, + expectedHosts: []string{"host2"}, // host1 blocked because existing-cr stays locked filteredHosts: []string{"host1"}, }, { @@ -867,6 +908,7 @@ func TestFilterHasEnoughCapacity_ReserveForCommittedResourceIntent(t *testing.T) // Request with reserve_for_committed_resource intent request: newNovaRequestWithIntent("new-reservation-uuid", "project-A", "m1.large", "gp-1", 4, "8Gi", "reserve_for_committed_resource", false, []string{"host1", "host2"}), opts: FilterHasEnoughCapacityOpts{LockReserved: false}, + pipelineOpts: scheduling.Options{LockReservations: true}, expectedHosts: []string{"host2"}, filteredHosts: []string{"host1"}, // host1 blocked by other project's reservation (would be blocked anyway) }, @@ -885,6 +927,7 @@ func TestFilterHasEnoughCapacity_ReserveForCommittedResourceIntent(t *testing.T) // After blocking all 3 reservations (24 CPU), only 8 CPU free -> should fail request: newNovaRequestWithIntent("new-reservation-uuid", "project-A", "m1.large", "gp-1", 10, "20Gi", "reserve_for_committed_resource", false, []string{"host1"}), opts: FilterHasEnoughCapacityOpts{LockReserved: false}, + pipelineOpts: scheduling.Options{LockReservations: true}, expectedHosts: []string{}, filteredHosts: []string{"host1"}, // All reservations stay locked, not enough capacity }, @@ -916,13 +959,14 @@ func TestFilterHasEnoughCapacity_ReserveForCommittedResourceIntent(t *testing.T) newCommittedReservation("existing-cr", "host1", "project-A", "m1.large", "gp-1", "8", "16Gi", nil, nil), }, // Request with reserve_for_committed_resource intent - // IgnoredReservationTypes is a safety flag that overrides everything, including intent + // IgnoredReservationTypes is a safety flag that overrides everything, including LockReservations request: newNovaRequestWithIntent("new-reservation-uuid", "project-A", "m1.large", "gp-1", 4, "8Gi", "reserve_for_committed_resource", false, []string{"host1"}), opts: FilterHasEnoughCapacityOpts{ LockReserved: false, // IgnoredReservationTypes is a safety override - ignores CR even for CR scheduling IgnoredReservationTypes: []v1alpha1.ReservationType{v1alpha1.ReservationTypeCommittedResource}, }, + pipelineOpts: scheduling.Options{LockReservations: true}, expectedHosts: []string{"host1"}, // CR reservation is ignored via IgnoredReservationTypes (safety override) filteredHosts: []string{}, }, @@ -960,6 +1004,7 @@ func TestFilterHasEnoughCapacity_ReserveForCommittedResourceIntent(t *testing.T) step := &FilterHasEnoughCapacity{} step.Client = fake.NewClientBuilder().WithScheme(scheme).WithObjects(objects...).Build() step.Options = tt.opts + tt.request.Options = tt.pipelineOpts result, err := step.Run(slog.Default(), tt.request) if err != nil { diff --git a/internal/scheduling/pods/filter_weigher_pipeline_controller.go b/internal/scheduling/pods/filter_weigher_pipeline_controller.go index 0ceee6485..cfbd06315 100644 --- a/internal/scheduling/pods/filter_weigher_pipeline_controller.go +++ b/internal/scheduling/pods/filter_weigher_pipeline_controller.go @@ -6,7 +6,6 @@ package pods import ( "context" "errors" - "fmt" "sync" "time" @@ -95,10 +94,6 @@ func (c *FilterWeigherPipelineController) ProcessNewPod(ctx context.Context, pod }, } - pipelineConf, ok := c.PipelineConfigs[decision.Spec.PipelineRef.Name] - if !ok { - return fmt.Errorf("pipeline %s not configured", decision.Spec.PipelineRef.Name) - } err := c.process(ctx, decision) if err != nil { meta.SetStatusCondition(&decision.Status.Conditions, metav1.Condition{ @@ -115,11 +110,6 @@ func (c *FilterWeigherPipelineController) ProcessNewPod(ctx context.Context, pod Message: "pipeline run succeeded", }) } - if pipelineConf.Spec.CreateHistory { - if upsertErr := c.HistoryManager.CreateOrUpdateHistory(ctx, decision, nil, err); upsertErr != nil { - ctrl.LoggerFrom(ctx).Error(upsertErr, "failed to create/update history") - } - } return err } @@ -156,9 +146,14 @@ func (c *FilterWeigherPipelineController) process(ctx context.Context, decision return errors.New("no nodes available for scheduling") } - // Execute the scheduling pipeline. + // Execute the scheduling pipeline. Options not set: pod scheduling always records history. request := pods.PodPipelineRequest{Nodes: nodes.Items, Pod: *pod} result, err := pipeline.Run(request) + if !request.Options.SkipHistory { + if upsertErr := c.HistoryManager.CreateOrUpdateHistory(ctx, decision, nil, err); upsertErr != nil { + log.Error(upsertErr, "failed to create/update history") + } + } if err != nil { log.V(1).Error(err, "failed to run scheduler pipeline") return errors.New("failed to run scheduler pipeline") diff --git a/internal/scheduling/pods/filter_weigher_pipeline_controller_test.go b/internal/scheduling/pods/filter_weigher_pipeline_controller_test.go index 143ed9f83..7a8159be6 100644 --- a/internal/scheduling/pods/filter_weigher_pipeline_controller_test.go +++ b/internal/scheduling/pods/filter_weigher_pipeline_controller_test.go @@ -122,6 +122,7 @@ func TestFilterWeigherPipelineController_Reconcile(t *testing.T) { Pipelines: map[string]lib.FilterWeigherPipeline[pods.PodPipelineRequest]{ "pods-scheduler": createMockPodPipeline(), }, + HistoryManager: lib.HistoryClient{Client: client}, }, Monitor: lib.FilterWeigherPipelineMonitor{}, } @@ -300,7 +301,6 @@ func TestFilterWeigherPipelineController_ProcessNewPod(t *testing.T) { Spec: v1alpha1.PipelineSpec{ Type: v1alpha1.PipelineTypeFilterWeigher, SchedulingDomain: v1alpha1.SchedulingDomainPods, - CreateHistory: true, Filters: []v1alpha1.FilterSpec{}, Weighers: []v1alpha1.WeigherSpec{}, }, @@ -334,14 +334,13 @@ func TestFilterWeigherPipelineController_ProcessNewPod(t *testing.T) { Spec: v1alpha1.PipelineSpec{ Type: v1alpha1.PipelineTypeFilterWeigher, SchedulingDomain: v1alpha1.SchedulingDomainPods, - CreateHistory: false, Filters: []v1alpha1.FilterSpec{}, Weighers: []v1alpha1.WeigherSpec{}, }, }, createHistory: false, expectError: false, - expectHistoryCreated: false, + expectHistoryCreated: true, expectNodeAssigned: true, expectTargetHost: "node1", }, @@ -381,14 +380,13 @@ func TestFilterWeigherPipelineController_ProcessNewPod(t *testing.T) { Spec: v1alpha1.PipelineSpec{ Type: v1alpha1.PipelineTypeFilterWeigher, SchedulingDomain: v1alpha1.SchedulingDomainPods, - CreateHistory: true, Filters: []v1alpha1.FilterSpec{}, Weighers: []v1alpha1.WeigherSpec{}, }, }, createHistory: true, expectError: true, - expectHistoryCreated: true, // Decision is created but processing fails + expectHistoryCreated: false, expectNodeAssigned: false, }, } diff --git a/internal/scheduling/reservations/capacity/controller.go b/internal/scheduling/reservations/capacity/controller.go index 76a8aaf9a..6e40cd562 100644 --- a/internal/scheduling/reservations/capacity/controller.go +++ b/internal/scheduling/reservations/capacity/controller.go @@ -22,6 +22,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" schedulerapi "github.com/cobaltcore-dev/cortex/api/external/nova" + "github.com/cobaltcore-dev/cortex/api/scheduling" "github.com/cobaltcore-dev/cortex/api/v1alpha1" "github.com/cobaltcore-dev/cortex/internal/knowledge/extractor/plugins/compute" "github.com/cobaltcore-dev/cortex/internal/scheduling/reservations" @@ -147,8 +148,13 @@ func (c *Controller) reconcileOne( cur := existingByName[flavor.Name] cur.FlavorName = flavor.Name - totalVMSlots, totalHosts, totalErr := c.probeScheduler(ctx, flavor, az, c.config.TotalPipeline, hvByName) - placeableVMs, placeableHosts, placeableErr := c.probeScheduler(ctx, flavor, az, c.config.PlaceablePipeline, hvByName) + totalVMSlots, totalHosts, totalErr := c.probeScheduler(ctx, flavor, az, c.config.TotalPipeline, hvByName, scheduling.Options{ + SkipHistory: true, + SkipInflight: true, + AssumeEmptyHosts: true, + IgnoredReservationTypes: []v1alpha1.ReservationType{v1alpha1.ReservationTypeCommittedResource, v1alpha1.ReservationTypeFailover}, + }) + placeableVMs, placeableHosts, placeableErr := c.probeScheduler(ctx, flavor, az, c.config.PlaceablePipeline, hvByName, scheduling.Options{SkipHistory: true, SkipInflight: true}) if totalErr != nil { allFresh = false @@ -246,6 +252,7 @@ func (c *Controller) probeScheduler( flavor compute.FlavorInGroup, az, pipeline string, hvByName map[string]hv1.Hypervisor, + opts scheduling.Options, ) (capacity, hosts int64, err error) { flavorBytes := int64(flavor.MemoryMB) * 1024 * 1024 //nolint:gosec @@ -271,7 +278,7 @@ func (c *Controller) probeScheduler( AvailabilityZone: az, Pipeline: pipeline, EligibleHosts: eligibleHosts, - }) + }, opts) if err != nil { return 0, 0, fmt.Errorf("scheduler call failed (pipeline=%s): %w", pipeline, err) } diff --git a/internal/scheduling/reservations/capacity/controller_test.go b/internal/scheduling/reservations/capacity/controller_test.go index 69a4e80bb..56d43d6f2 100644 --- a/internal/scheduling/reservations/capacity/controller_test.go +++ b/internal/scheduling/reservations/capacity/controller_test.go @@ -21,6 +21,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client/fake" schedulerapi "github.com/cobaltcore-dev/cortex/api/external/nova" + "github.com/cobaltcore-dev/cortex/api/scheduling" "github.com/cobaltcore-dev/cortex/api/v1alpha1" "github.com/cobaltcore-dev/cortex/internal/knowledge/extractor/plugins/compute" "github.com/cobaltcore-dev/cortex/internal/scheduling/reservations" @@ -429,7 +430,7 @@ func TestProbeScheduler_CapacityCalculation(t *testing.T) { } flavor := compute.FlavorInGroup{Name: "test-flavor", MemoryMB: memMB} - capacity, hosts, err := c.probeScheduler(context.Background(), flavor, "az-a", "test-pipeline", hvByName) + capacity, hosts, err := c.probeScheduler(context.Background(), flavor, "az-a", "test-pipeline", hvByName, scheduling.Options{SkipHistory: true}) if err != nil { t.Fatalf("probeScheduler failed: %v", err) } diff --git a/internal/scheduling/reservations/commitments/reservation_controller.go b/internal/scheduling/reservations/commitments/reservation_controller.go index b65842c60..04d7733bc 100644 --- a/internal/scheduling/reservations/commitments/reservation_controller.go +++ b/internal/scheduling/reservations/commitments/reservation_controller.go @@ -23,6 +23,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/reconcile" schedulerdelegationapi "github.com/cobaltcore-dev/cortex/api/external/nova" + "github.com/cobaltcore-dev/cortex/api/scheduling" "github.com/cobaltcore-dev/cortex/api/v1alpha1" "github.com/cobaltcore-dev/cortex/internal/scheduling/reservations" "github.com/cobaltcore-dev/cortex/pkg/multicluster" @@ -286,8 +287,17 @@ func (r *CommitmentReservationController) Reconcile(ctx context.Context, req ctr "_nova_check_type": string(schedulerdelegationapi.ReserveForCommittedResourceIntent), }, } + scheduleOpts := scheduling.Options{ + ReadOnly: false, // mutates state (reservation placement) + LockReservations: true, // don't unlock CR reservations; finding a slot, not placing a VM + AssumeEmptyHosts: false, + IgnoredReservationTypes: nil, + MaxCandidates: 1, + SkipHistory: true, + SkipInflight: false, // TODO pessimistic blocking needed, will be addressed in follow up ticket + } - scheduleResp, err := r.SchedulerClient.ScheduleReservation(ctx, scheduleReq) + scheduleResp, err := r.SchedulerClient.ScheduleReservation(ctx, scheduleReq, scheduleOpts) if err != nil { logger.Error(err, "failed to schedule reservation") return ctrl.Result{}, err diff --git a/internal/scheduling/reservations/failover/integration_test.go b/internal/scheduling/reservations/failover/integration_test.go index df1354be6..316633e3a 100644 --- a/internal/scheduling/reservations/failover/integration_test.go +++ b/internal/scheduling/reservations/failover/integration_test.go @@ -1119,10 +1119,10 @@ func newIntegrationTestEnv(t *testing.T, vms []VM, hypervisors []*hv1.Hypervisor Client: k8sClient, Pipelines: make(map[string]lib.FilterWeigherPipeline[novaapi.ExternalSchedulerRequest]), PipelineConfigs: make(map[string]v1alpha1.Pipeline), + HistoryManager: lib.HistoryClient{Client: k8sClient}, }, Monitor: getSharedMonitor(), } - // Register all pipelines needed for testing pipelines := []v1alpha1.Pipeline{ { @@ -1309,11 +1309,10 @@ func newIntegrationTestEnvWithTraitsFilter(t *testing.T, vms []VM, hypervisors [ Client: k8sClient, Pipelines: make(map[string]lib.FilterWeigherPipeline[novaapi.ExternalSchedulerRequest]), PipelineConfigs: make(map[string]v1alpha1.Pipeline), + HistoryManager: lib.HistoryClient{Client: k8sClient}, }, Monitor: getSharedMonitor(), } - - // Register all pipelines needed for testing (with traits filter enabled) pipelines := []v1alpha1.Pipeline{ { ObjectMeta: metav1.ObjectMeta{ diff --git a/internal/scheduling/reservations/failover/reservation_scheduling.go b/internal/scheduling/reservations/failover/reservation_scheduling.go index f482f3393..63bf5b0f5 100644 --- a/internal/scheduling/reservations/failover/reservation_scheduling.go +++ b/internal/scheduling/reservations/failover/reservation_scheduling.go @@ -10,6 +10,7 @@ import ( "sort" api "github.com/cobaltcore-dev/cortex/api/external/nova" + "github.com/cobaltcore-dev/cortex/api/scheduling" "github.com/cobaltcore-dev/cortex/api/v1alpha1" "github.com/cobaltcore-dev/cortex/internal/scheduling/reservations" ) @@ -22,7 +23,8 @@ const ( // PipelineNewFailoverReservation is used to find a host for creating a new reservation. // It validates host compatibility AND checks capacity. - PipelineNewFailoverReservation = "kvm-new-failover-reservation" + // Uses the general-purpose pipeline; LockReservations and SkipHistory are set via Options. + PipelineNewFailoverReservation = "kvm-general-purpose-load-balancing" // PipelineAcknowledgeFailoverReservation is used to validate that a failover reservation // is still valid for all its allocated VMs. It sends an evacuation-style scheduling request @@ -30,7 +32,7 @@ const ( PipelineAcknowledgeFailoverReservation = "kvm-acknowledge-failover-reservation" ) -func (c *FailoverReservationController) queryHypervisorsFromScheduler(ctx context.Context, vm VM, allHypervisors []string, pipeline string, resSpec resolvedReservationSpec) ([]string, error) { +func (c *FailoverReservationController) queryHypervisorsFromScheduler(ctx context.Context, vm VM, allHypervisors []string, pipeline string, resSpec resolvedReservationSpec, opts scheduling.Options) ([]string, error) { logger := LoggerFromContext(ctx) // Build list of eligible hypervisors (excluding VM's current hypervisor) @@ -91,7 +93,7 @@ func (c *FailoverReservationController) queryHypervisorsFromScheduler(ctx contex "eligibleHypervisors", len(eligibleHypervisors), "ignoreHypervisors", ignoreHypervisors) - scheduleResp, err := c.SchedulerClient.ScheduleReservation(ctx, scheduleReq) + scheduleResp, err := c.SchedulerClient.ScheduleReservation(ctx, scheduleReq, opts) if err != nil { logger.Error(err, "failed to schedule failover reservation", "vmUUID", vm.UUID, "pipeline", pipeline) return nil, fmt.Errorf("failed to schedule failover reservation: %w", err) @@ -121,7 +123,7 @@ func (c *FailoverReservationController) tryReuseExistingReservation( logger := LoggerFromContext(ctx) - validHypervisors, err := c.queryHypervisorsFromScheduler(ctx, vm, allHypervisors, PipelineReuseFailoverReservation, resSpec) + validHypervisors, err := c.queryHypervisorsFromScheduler(ctx, vm, allHypervisors, PipelineReuseFailoverReservation, resSpec, scheduling.Options{ReadOnly: true, SkipHistory: true, SkipInflight: true}) if err != nil { logger.Error(err, "failed to get potential hypervisors for VM", "vmUUID", vm.UUID) return nil @@ -222,7 +224,7 @@ func (c *FailoverReservationController) validateVMViaSchedulerEvacuation( "vmCurrentHost", vm.CurrentHypervisor, "pipeline", PipelineAcknowledgeFailoverReservation) - resp, err := c.SchedulerClient.ScheduleReservation(ctx, scheduleReq) + resp, err := c.SchedulerClient.ScheduleReservation(ctx, scheduleReq, scheduling.Options{ReadOnly: true, LockReservations: true, SkipHistory: true, SkipInflight: true}) if err != nil { logger.Error(err, "failed to validate VM for reservation host", "vmUUID", vm.UUID, "reservationHost", reservationHost) return false, fmt.Errorf("failed to validate VM for reservation host: %w", err) @@ -264,7 +266,7 @@ func (c *FailoverReservationController) scheduleAndBuildNewFailoverReservation( // Get potential hypervisors from scheduler using the reservation spec resources // (which may be sized to the LargestFlavor from the flavor group) - validHypervisors, err := c.queryHypervisorsFromScheduler(ctx, vm, allHypervisors, PipelineNewFailoverReservation, resSpec) + validHypervisors, err := c.queryHypervisorsFromScheduler(ctx, vm, allHypervisors, PipelineNewFailoverReservation, resSpec, scheduling.Options{LockReservations: true, SkipHistory: true, SkipInflight: true}) if err != nil { return nil, fmt.Errorf("failed to get potential hypervisors for VM: %w", err) } diff --git a/internal/scheduling/reservations/scheduler_client.go b/internal/scheduling/reservations/scheduler_client.go index a42172ce2..fb3ef269b 100644 --- a/internal/scheduling/reservations/scheduler_client.go +++ b/internal/scheduling/reservations/scheduler_client.go @@ -12,6 +12,7 @@ import ( "time" api "github.com/cobaltcore-dev/cortex/api/external/nova" + "github.com/cobaltcore-dev/cortex/api/scheduling" "github.com/go-logr/logr" logf "sigs.k8s.io/controller-runtime/pkg/log" ) @@ -89,9 +90,13 @@ type ScheduleReservationResponse struct { // ScheduleReservation calls the external scheduler API to find a host for a reservation. // The context should contain GlobalRequestID and RequestID for logging (use WithGlobalRequestID/WithRequestID). -func (c *SchedulerClient) ScheduleReservation(ctx context.Context, req ScheduleReservationRequest) (*ScheduleReservationResponse, error) { +func (c *SchedulerClient) ScheduleReservation(ctx context.Context, req ScheduleReservationRequest, opts scheduling.Options) (*ScheduleReservationResponse, error) { logger := loggerFromContext(ctx) + if err := opts.Validate(); err != nil { + return nil, fmt.Errorf("invalid scheduling options: %w", err) + } + // Build weights map (all zero for reservations) weights := make(map[string]float64, len(req.EligibleHosts)) for _, host := range req.EligibleHosts { @@ -115,6 +120,7 @@ func (c *SchedulerClient) ScheduleReservation(ctx context.Context, req ScheduleR Pipeline: req.Pipeline, Hosts: req.EligibleHosts, Weights: weights, + Options: opts, Context: api.NovaRequestContext{ RequestID: RequestIDFromContext(ctx), GlobalRequestID: globalReqID,