Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 29 additions & 12 deletions cluster-autoscaler/context/autoscaling_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ limitations under the License.
package context

import (
"time"

appsv1 "k8s.io/api/apps/v1"
apiv1 "k8s.io/api/core/v1"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
"k8s.io/autoscaler/cluster-autoscaler/clusterstate"
"k8s.io/autoscaler/cluster-autoscaler/clusterstate/utils"
Expand All @@ -29,7 +33,9 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
draprovider "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/provider"
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
"k8s.io/autoscaler/cluster-autoscaler/utils/taints"
"k8s.io/client-go/informers"
kube_client "k8s.io/client-go/kubernetes"
kube_record "k8s.io/client-go/tools/record"
Expand Down Expand Up @@ -61,10 +67,19 @@ type AutoscalingContext struct {
RemainingPdbTracker pdb.RemainingPdbTracker
// ClusterStateRegistry tracks the health of the node groups and pending scale-ups and scale-downs
ClusterStateRegistry *clusterstate.ClusterStateRegistry
//ProvisionRequstScaleUpMode indicates whether ClusterAutoscaler tries to accommodate ProvisioningRequest in current scale up iteration.
// ProvisionRequstScaleUpMode indicates whether ClusterAutoscaler tries to accommodate ProvisioningRequest in current scale up iteration.
ProvisioningRequestScaleUpMode bool
// DraProvider is the provider for dynamic resources allocation.
DraProvider *draprovider.Provider
// TemplateNodeInfoRegistry allows accessing template node infos.
TemplateNodeInfoRegistry TemplateNodeInfoRegistry
}

// TemplateNodeInfoRegistry is the interface for getting template node infos.
type TemplateNodeInfoRegistry interface {
GetNodeInfo(id string) (*framework.NodeInfo, bool)
GetNodeInfos() map[string]*framework.NodeInfo
Recompute(autoscalingCtx *AutoscalingContext, nodes []*apiv1.Node, daemonsets []*appsv1.DaemonSet, taintConfig taints.TaintConfig, currentTime time.Time) errors.AutoscalerError
}

// AutoscalingKubeClients contains all Kubernetes API clients,
Expand Down Expand Up @@ -112,19 +127,21 @@ func NewAutoscalingContext(
remainingPdbTracker pdb.RemainingPdbTracker,
clusterStateRegistry *clusterstate.ClusterStateRegistry,
draProvider *draprovider.Provider,
templateNodeInfoRegistry TemplateNodeInfoRegistry,
) *AutoscalingContext {
return &AutoscalingContext{
AutoscalingOptions: options,
CloudProvider: cloudProvider,
AutoscalingKubeClients: *autoscalingKubeClients,
FrameworkHandle: fwHandle,
ClusterSnapshot: clusterSnapshot,
ExpanderStrategy: expanderStrategy,
ProcessorCallbacks: processorCallbacks,
DebuggingSnapshotter: debuggingSnapshotter,
RemainingPdbTracker: remainingPdbTracker,
ClusterStateRegistry: clusterStateRegistry,
DraProvider: draProvider,
AutoscalingOptions: options,
CloudProvider: cloudProvider,
AutoscalingKubeClients: *autoscalingKubeClients,
FrameworkHandle: fwHandle,
ClusterSnapshot: clusterSnapshot,
ExpanderStrategy: expanderStrategy,
ProcessorCallbacks: processorCallbacks,
DebuggingSnapshotter: debuggingSnapshotter,
RemainingPdbTracker: remainingPdbTracker,
ClusterStateRegistry: clusterStateRegistry,
DraProvider: draProvider,
TemplateNodeInfoRegistry: templateNodeInfoRegistry,
}
}

Expand Down
36 changes: 22 additions & 14 deletions cluster-autoscaler/core/static_autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/metrics"
"k8s.io/autoscaler/cluster-autoscaler/observers/loopstart"
ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodeinfosprovider"
"k8s.io/autoscaler/cluster-autoscaler/processors/status"
"k8s.io/autoscaler/cluster-autoscaler/resourcequotas"
"k8s.io/autoscaler/cluster-autoscaler/simulator"
Expand Down Expand Up @@ -154,6 +155,9 @@ func NewStaticAutoscaler(
}
clusterStateRegistry := clusterstate.NewClusterStateRegistry(cloudProvider, clusterStateConfig, autoscalingKubeClients.LogRecorder, backoff, processors.NodeGroupConfigProcessor, processors.AsyncNodeGroupStateChecker)
processorCallbacks := newStaticAutoscalerProcessorCallbacks()

templateNodeInfoRegistry := nodeinfosprovider.NewTemplateNodeInfoRegistry(processors.TemplateNodeInfoProvider)

autoscalingCtx := ca_context.NewAutoscalingContext(
opts,
fwHandle,
Expand All @@ -165,7 +169,8 @@ func NewStaticAutoscaler(
debuggingSnapshotter,
remainingPdbTracker,
clusterStateRegistry,
draProvider)
draProvider,
templateNodeInfoRegistry)

taintConfig := taints.NewTaintConfig(opts)
processors.ScaleDownCandidatesNotifier.Register(clusterStateRegistry)
Expand Down Expand Up @@ -358,15 +363,14 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr
return typedErr.AddPrefix("failed to initialize RemainingPdbTracker: ")
}

nodeInfosForGroups, autoscalerError := a.processors.TemplateNodeInfoProvider.Process(autoscalingCtx, allNodes, daemonsets, a.taintConfig, currentTime)
if autoscalerError != nil {
klog.Errorf("Failed to get node infos for groups: %v", autoscalerError)
return autoscalerError.AddPrefix("failed to build node infos for node groups: ")
if autoscalerError := a.AutoscalingContext.TemplateNodeInfoRegistry.Recompute(a.AutoscalingContext, allNodes, daemonsets, a.taintConfig, currentTime); autoscalerError != nil {
klog.Errorf("Failed to recompute template node infos: %v", autoscalerError)
return autoscalerError.AddPrefix("failed to recompute template node infos: ")
}

a.DebuggingSnapshotter.SetTemplateNodes(nodeInfosForGroups)
a.DebuggingSnapshotter.SetTemplateNodes(autoscalingCtx.TemplateNodeInfoRegistry.GetNodeInfos())

if typedErr := a.updateClusterState(allNodes, nodeInfosForGroups, currentTime); typedErr != nil {
if typedErr := a.updateClusterState(allNodes, currentTime); typedErr != nil {
klog.Errorf("Failed to update cluster state: %v", typedErr)
return typedErr
}
Expand Down Expand Up @@ -458,7 +462,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr
// them and not trigger another scale-up.
// The fake nodes are intentionally not added to the all nodes list, so that they are not considered as candidates for scale-down (which
// doesn't make sense as they're not real).
err = a.addUpcomingNodesToClusterSnapshot(upcomingCounts, nodeInfosForGroups)
err = a.addUpcomingNodesToClusterSnapshot(upcomingCounts)
if err != nil {
klog.Errorf("Failed adding upcoming nodes to cluster snapshot: %v", err)
return caerrors.ToAutoscalerError(caerrors.InternalError, err)
Expand Down Expand Up @@ -563,7 +567,8 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr
for i, nodeInfo := range allNodeInfos {
nodes[i] = nodeInfo.Node()
}
scaleUpStatus, typedErr = a.scaleUpOrchestrator.ScaleUp(unschedulablePodsToHelp, nodes, daemonsets, nodeInfosForGroups, false)
nodeInfos := a.AutoscalingContext.TemplateNodeInfoRegistry.GetNodeInfos()
scaleUpStatus, typedErr = a.scaleUpOrchestrator.ScaleUp(unschedulablePodsToHelp, nodes, daemonsets, nodeInfos, false)
postScaleUp(scaleUpStart)
}

Expand Down Expand Up @@ -672,7 +677,8 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr
for i, nodeInfo := range allNodeInfos {
nodes[i] = nodeInfo.Node()
}
scaleUpStatus, typedErr = a.scaleUpOrchestrator.ScaleUpToNodeGroupMinSize(nodes, nodeInfosForGroups)
nodeInfos := a.AutoscalingContext.TemplateNodeInfoRegistry.GetNodeInfos()
scaleUpStatus, typedErr = a.scaleUpOrchestrator.ScaleUpToNodeGroupMinSize(nodes, nodeInfos)
postScaleUp(scaleUpStart)
}

Expand All @@ -694,11 +700,12 @@ func (a *StaticAutoscaler) updateSoftDeletionTaints(allNodes []*apiv1.Node) {
}
}

func (a *StaticAutoscaler) addUpcomingNodesToClusterSnapshot(upcomingCounts map[string]int, nodeInfosForGroups map[string]*framework.NodeInfo) error {
func (a *StaticAutoscaler) addUpcomingNodesToClusterSnapshot(upcomingCounts map[string]int) error {
nodeGroups := a.nodeGroupsById()
upcomingNodeGroups := make(map[string]int)
upcomingNodesFromUpcomingNodeGroups := 0
upcomingNodeInfosPerNg, err := getUpcomingNodeInfos(upcomingCounts, nodeInfosForGroups)
nodeInfos := a.AutoscalingContext.TemplateNodeInfoRegistry.GetNodeInfos()
upcomingNodeInfosPerNg, err := getUpcomingNodeInfos(upcomingCounts, nodeInfos)
if err != nil {
return err
}
Expand Down Expand Up @@ -1043,8 +1050,9 @@ func filterNodesFromSelectedGroups(cp cloudprovider.CloudProvider, nodes ...*api
return filtered
}

func (a *StaticAutoscaler) updateClusterState(allNodes []*apiv1.Node, nodeInfosForGroups map[string]*framework.NodeInfo, currentTime time.Time) caerrors.AutoscalerError {
err := a.clusterStateRegistry.UpdateNodes(allNodes, nodeInfosForGroups, currentTime)
func (a *StaticAutoscaler) updateClusterState(allNodes []*apiv1.Node, currentTime time.Time) caerrors.AutoscalerError {
nodeInfos := a.AutoscalingContext.TemplateNodeInfoRegistry.GetNodeInfos()
err := a.clusterStateRegistry.UpdateNodes(allNodes, nodeInfos, currentTime)
if err != nil {
klog.Errorf("Failed to update node registry: %v", err)
a.scaleDownPlanner.CleanUpUnneededNodes()
Expand Down
2 changes: 1 addition & 1 deletion cluster-autoscaler/core/static_autoscaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2485,7 +2485,7 @@ func TestStaticAutoscalerUpcomingScaleDownCandidates(t *testing.T) {
csr := clusterstate.NewClusterStateRegistry(provider, csrConfig, autoscalingCtx.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute, MaxNodeStartupTime: 15 * time.Minute}), processors.AsyncNodeGroupStateChecker)

// Setting the Actuator is necessary for testing any scale-down logic, it shouldn't have anything to do in this test.
actuator := actuation.NewActuator(&autoscalingCtx, csr, deletiontracker.NewNodeDeletionTracker(0*time.Second), options.NodeDeleteOptions{}, nil, processorstest.NewTestProcessors(&autoscalingCtx).NodeGroupConfigProcessor)
actuator := actuation.NewActuator(&autoscalingCtx, csr, deletiontracker.NewNodeDeletionTracker(0*time.Second), options.NodeDeleteOptions{}, nil, processors.NodeGroupConfigProcessor)
autoscalingCtx.ScaleDownActuator = actuator

// Fake planner that keeps track of the scale-down candidates passed to UpdateClusterState.
Expand Down
21 changes: 16 additions & 5 deletions cluster-autoscaler/processors/customresources/dra_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package customresources
import (
apiv1 "k8s.io/api/core/v1"
resourceapi "k8s.io/api/resource/v1"
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"

"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
Expand Down Expand Up @@ -57,11 +58,21 @@ func (p *DraCustomResourcesProcessor) FilterOutNodesWithUnreadyResources(autosca
continue
}

nodeInfo, err := ng.TemplateNodeInfo()
if err != nil {
newReadyNodes = append(newReadyNodes, node)
klog.Warningf("Failed to get template node info for node group %s with error: %v", ng.Id(), err)
continue
var nodeInfo *framework.NodeInfo
if autoscalingCtx.TemplateNodeInfoRegistry != nil {
// Prefer the cached template from the registry. This template may contain enrichments (e.g.
// custom DRA slices) that are not present in the raw CloudProvider template.
if ni, found := autoscalingCtx.TemplateNodeInfoRegistry.GetNodeInfo(ng.Id()); found {
nodeInfo = ni
}
}
if nodeInfo == nil {
nodeInfo, err = ng.TemplateNodeInfo()
if err != nil {
newReadyNodes = append(newReadyNodes, node)
klog.Warningf("Failed to get template node info for node group %s with error: %v", ng.Id(), err)
continue
}
}

nodeResourcesSlices, _ := draSnapshot.NodeResourceSlices(node.Name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,14 @@ import (
"testing"
"time"

appsv1 "k8s.io/api/apps/v1"
resourceapi "k8s.io/api/resource/v1"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot/store"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot/testsnapshot"
drasnapshot "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/snapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
"k8s.io/autoscaler/cluster-autoscaler/utils/taints"

"github.com/stretchr/testify/assert"
apiv1 "k8s.io/api/core/v1"
Expand All @@ -35,12 +38,36 @@ import (
utils "k8s.io/autoscaler/cluster-autoscaler/utils/test"
)

type mockTemplateNodeInfoRegistry struct {
nodeInfos map[string]*framework.NodeInfo
}

func newMockTemplateNodeInfoRegistry(nodeInfos map[string]*framework.NodeInfo) *mockTemplateNodeInfoRegistry {
return &mockTemplateNodeInfoRegistry{
nodeInfos: nodeInfos,
}
}

func (m *mockTemplateNodeInfoRegistry) GetNodeInfo(id string) (*framework.NodeInfo, bool) {
nodeInfo, found := m.nodeInfos[id]
return nodeInfo, found
}

func (m *mockTemplateNodeInfoRegistry) GetNodeInfos() map[string]*framework.NodeInfo {
return m.nodeInfos
}

func (m *mockTemplateNodeInfoRegistry) Recompute(_ *ca_context.AutoscalingContext, _ []*apiv1.Node, _ []*appsv1.DaemonSet, _ taints.TaintConfig, _ time.Time) errors.AutoscalerError {
return nil
}

func TestFilterOutNodesWithUnreadyDRAResources(t *testing.T) {
testCases := map[string]struct {
nodeGroupsAllNodes map[string][]*apiv1.Node
nodeGroupsTemplatesSlices map[string][]*resourceapi.ResourceSlice
nodesSlices map[string][]*resourceapi.ResourceSlice
expectedNodesReadiness map[string]bool
registryNodeInfos map[string]*framework.NodeInfo
}{
"1 DRA node group all totally ready": {
nodeGroupsAllNodes: map[string][]*apiv1.Node{
Expand Down Expand Up @@ -306,6 +333,29 @@ func TestFilterOutNodesWithUnreadyDRAResources(t *testing.T) {
"node_7": true,
},
},
"Custom DRA driver retrieved via cached template node info": {
nodeGroupsAllNodes: map[string][]*apiv1.Node{
"ng1": {
buildTestNode("node_1", true),
buildTestNode("node_2", true),
},
},
nodeGroupsTemplatesSlices: map[string][]*resourceapi.ResourceSlice{},
registryNodeInfos: map[string]*framework.NodeInfo{
"ng1": framework.NewNodeInfo(
buildTestNode("ng1_template", true),
createNodeResourceSlices("ng1_template", []int{1}),
),
},
nodesSlices: map[string][]*resourceapi.ResourceSlice{
"node_1": createNodeResourceSlices("node_1", []int{1}),
"node_2": {},
},
expectedNodesReadiness: map[string]bool{
"node_1": true,
"node_2": false,
},
},
}

for tcName, tc := range testCases {
Expand Down Expand Up @@ -336,7 +386,11 @@ func TestFilterOutNodesWithUnreadyDRAResources(t *testing.T) {
clusterSnapshotStore.SetClusterState([]*apiv1.Node{}, []*apiv1.Pod{}, draSnapshot)
clusterSnapshot, _, _ := testsnapshot.NewCustomTestSnapshotAndHandle(clusterSnapshotStore)

autoscalingCtx := &ca_context.AutoscalingContext{CloudProvider: provider, ClusterSnapshot: clusterSnapshot}
autoscalingCtx := &ca_context.AutoscalingContext{
CloudProvider: provider,
ClusterSnapshot: clusterSnapshot,
TemplateNodeInfoRegistry: newMockTemplateNodeInfoRegistry(tc.registryNodeInfos),
}
processor := DraCustomResourcesProcessor{}
newAllNodes, newReadyNodes := processor.FilterOutNodesWithUnreadyResources(autoscalingCtx, initialAllNodes, initialReadyNodes, draSnapshot)

Expand Down
Loading