Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 22 additions & 12 deletions cluster-autoscaler/context/autoscaling_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,18 @@ type AutoscalingContext struct {
RemainingPdbTracker pdb.RemainingPdbTracker
// ClusterStateRegistry tracks the health of the node groups and pending scale-ups and scale-downs
ClusterStateRegistry *clusterstate.ClusterStateRegistry
//ProvisionRequstScaleUpMode indicates whether ClusterAutoscaler tries to accommodate ProvisioningRequest in current scale up iteration.
// ProvisionRequstScaleUpMode indicates whether ClusterAutoscaler tries to accommodate ProvisioningRequest in current scale up iteration.
ProvisioningRequestScaleUpMode bool
// DraProvider is the provider for dynamic resources allocation.
DraProvider *draprovider.Provider
// TemplateNodeInfoRegistry allows accessing template node infos.
TemplateNodeInfoRegistry TemplateNodeInfoRegistry
}

// TemplateNodeInfoRegistry is the interface for getting template node infos.
type TemplateNodeInfoRegistry interface {
GetNodeInfo(id string) (*framework.NodeInfo, bool)
GetNodeInfos() map[string]*framework.NodeInfo
}

// AutoscalingKubeClients contains all Kubernetes API clients,
Expand Down Expand Up @@ -112,19 +120,21 @@ func NewAutoscalingContext(
remainingPdbTracker pdb.RemainingPdbTracker,
clusterStateRegistry *clusterstate.ClusterStateRegistry,
draProvider *draprovider.Provider,
templateNodeInfoRegistry TemplateNodeInfoRegistry,
) *AutoscalingContext {
return &AutoscalingContext{
AutoscalingOptions: options,
CloudProvider: cloudProvider,
AutoscalingKubeClients: *autoscalingKubeClients,
FrameworkHandle: fwHandle,
ClusterSnapshot: clusterSnapshot,
ExpanderStrategy: expanderStrategy,
ProcessorCallbacks: processorCallbacks,
DebuggingSnapshotter: debuggingSnapshotter,
RemainingPdbTracker: remainingPdbTracker,
ClusterStateRegistry: clusterStateRegistry,
DraProvider: draProvider,
AutoscalingOptions: options,
CloudProvider: cloudProvider,
AutoscalingKubeClients: *autoscalingKubeClients,
FrameworkHandle: fwHandle,
ClusterSnapshot: clusterSnapshot,
ExpanderStrategy: expanderStrategy,
ProcessorCallbacks: processorCallbacks,
DebuggingSnapshotter: debuggingSnapshotter,
RemainingPdbTracker: remainingPdbTracker,
ClusterStateRegistry: clusterStateRegistry,
DraProvider: draProvider,
TemplateNodeInfoRegistry: templateNodeInfoRegistry,
}
}

Expand Down
8 changes: 7 additions & 1 deletion cluster-autoscaler/core/static_autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/metrics"
"k8s.io/autoscaler/cluster-autoscaler/observers/loopstart"
ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodeinfosprovider"
"k8s.io/autoscaler/cluster-autoscaler/processors/status"
"k8s.io/autoscaler/cluster-autoscaler/simulator"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
Expand Down Expand Up @@ -152,6 +153,10 @@ func NewStaticAutoscaler(
}
clusterStateRegistry := clusterstate.NewClusterStateRegistry(cloudProvider, clusterStateConfig, autoscalingKubeClients.LogRecorder, backoff, processors.NodeGroupConfigProcessor, processors.AsyncNodeGroupStateChecker)
processorCallbacks := newStaticAutoscalerProcessorCallbacks()

templateNodeInfoRegistry := nodeinfosprovider.NewTemplateNodeInfoRegistry(processors.TemplateNodeInfoProvider)
processors.TemplateNodeInfoProvider = templateNodeInfoRegistry

autoscalingCtx := ca_context.NewAutoscalingContext(
opts,
fwHandle,
Expand All @@ -163,7 +168,8 @@ func NewStaticAutoscaler(
debuggingSnapshotter,
remainingPdbTracker,
clusterStateRegistry,
draProvider)
draProvider,
templateNodeInfoRegistry)

taintConfig := taints.NewTaintConfig(opts)
processors.ScaleDownCandidatesNotifier.Register(clusterStateRegistry)
Expand Down
19 changes: 14 additions & 5 deletions cluster-autoscaler/processors/customresources/dra_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package customresources
import (
apiv1 "k8s.io/api/core/v1"
resourceapi "k8s.io/api/resource/v1"
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"

"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
Expand Down Expand Up @@ -57,11 +58,19 @@ func (p *DraCustomResourcesProcessor) FilterOutNodesWithUnreadyResources(autosca
continue
}

nodeInfo, err := ng.TemplateNodeInfo()
if err != nil {
newReadyNodes = append(newReadyNodes, node)
klog.Warningf("Failed to get template node info for node group %s with error: %v", ng.Id(), err)
continue
var nodeInfo *framework.NodeInfo
if autoscalingCtx.TemplateNodeInfoRegistry != nil {
if ni, found := autoscalingCtx.TemplateNodeInfoRegistry.GetNodeInfo(ng.Id()); found {
nodeInfo = ni
}
}
if nodeInfo == nil {
nodeInfo, err = ng.TemplateNodeInfo()
if err != nil {
newReadyNodes = append(newReadyNodes, node)
klog.Warningf("Failed to get template node info for node group %s with error: %v", ng.Id(), err)
continue
}
}

nodeResourcesSlices, _ := draSnapshot.NodeResourceSlices(node.Name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,32 @@ import (
utils "k8s.io/autoscaler/cluster-autoscaler/utils/test"
)

type mockTemplateNodeInfoRegistry struct {
nodeInfos map[string]*framework.NodeInfo
}

func newMockTemplateNodeInfoRegistry(nodeInfos map[string]*framework.NodeInfo) *mockTemplateNodeInfoRegistry {
return &mockTemplateNodeInfoRegistry{
nodeInfos: nodeInfos,
}
}

func (m *mockTemplateNodeInfoRegistry) GetNodeInfo(id string) (*framework.NodeInfo, bool) {
nodeInfo, found := m.nodeInfos[id]
return nodeInfo, found
}

func (m *mockTemplateNodeInfoRegistry) GetNodeInfos() map[string]*framework.NodeInfo {
return m.nodeInfos
}

func TestFilterOutNodesWithUnreadyDRAResources(t *testing.T) {
testCases := map[string]struct {
nodeGroupsAllNodes map[string][]*apiv1.Node
nodeGroupsTemplatesSlices map[string][]*resourceapi.ResourceSlice
nodesSlices map[string][]*resourceapi.ResourceSlice
expectedNodesReadiness map[string]bool
registryNodeInfos map[string]*framework.NodeInfo
}{
"1 DRA node group all totally ready": {
nodeGroupsAllNodes: map[string][]*apiv1.Node{
Expand Down Expand Up @@ -306,6 +326,29 @@ func TestFilterOutNodesWithUnreadyDRAResources(t *testing.T) {
"node_7": true,
},
},
"Custom DRA driver retrieved via cached template node info": {
nodeGroupsAllNodes: map[string][]*apiv1.Node{
"ng1": {
buildTestNode("node_1", true),
buildTestNode("node_2", true),
},
},
nodeGroupsTemplatesSlices: map[string][]*resourceapi.ResourceSlice{},
registryNodeInfos: map[string]*framework.NodeInfo{
"ng1": framework.NewNodeInfo(
buildTestNode("ng1_template", true),
createNodeResourceSlices("ng1_template", []int{1}),
),
},
nodesSlices: map[string][]*resourceapi.ResourceSlice{
"node_1": createNodeResourceSlices("node_1", []int{1}),
"node_2": {},
},
expectedNodesReadiness: map[string]bool{
"node_1": true,
"node_2": false,
},
},
}

for tcName, tc := range testCases {
Expand Down Expand Up @@ -336,7 +379,11 @@ func TestFilterOutNodesWithUnreadyDRAResources(t *testing.T) {
clusterSnapshotStore.SetClusterState([]*apiv1.Node{}, []*apiv1.Pod{}, draSnapshot)
clusterSnapshot, _, _ := testsnapshot.NewCustomTestSnapshotAndHandle(clusterSnapshotStore)

autoscalingCtx := &ca_context.AutoscalingContext{CloudProvider: provider, ClusterSnapshot: clusterSnapshot}
autoscalingCtx := &ca_context.AutoscalingContext{
CloudProvider: provider,
ClusterSnapshot: clusterSnapshot,
TemplateNodeInfoRegistry: newMockTemplateNodeInfoRegistry(tc.registryNodeInfos),
}
processor := DraCustomResourcesProcessor{}
newAllNodes, newReadyNodes := processor.FilterOutNodesWithUnreadyResources(autoscalingCtx, initialAllNodes, initialReadyNodes, draSnapshot)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
Copyright 2025 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package nodeinfosprovider

import (
"maps"
"sync"
"time"

appsv1 "k8s.io/api/apps/v1"
apiv1 "k8s.io/api/core/v1"

ca_context "k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
"k8s.io/autoscaler/cluster-autoscaler/utils/taints"
)

// TemplateNodeInfoRegistry is a component that stores and exposes template NodeInfos.
type TemplateNodeInfoRegistry struct {
processor TemplateNodeInfoProvider
nodeInfos map[string]*framework.NodeInfo
lock sync.RWMutex
}

// NewTemplateNodeInfoRegistry creates a new TemplateNodeInfoRegistry.
func NewTemplateNodeInfoRegistry(processor TemplateNodeInfoProvider) *TemplateNodeInfoRegistry {
return &TemplateNodeInfoRegistry{
processor: processor,
nodeInfos: make(map[string]*framework.NodeInfo),
}
}

// Process calls the embedded processor and updates the cache.
func (r *TemplateNodeInfoRegistry) Process(autoscalingCtx *ca_context.AutoscalingContext, nodes []*apiv1.Node, daemonsets []*appsv1.DaemonSet, taintConfig taints.TaintConfig, currentTime time.Time) (map[string]*framework.NodeInfo, errors.AutoscalerError) {
nodeInfos, err := r.processor.Process(autoscalingCtx, nodes, daemonsets, taintConfig, currentTime)
if err != nil {
return nil, err
}

r.lock.Lock()
defer r.lock.Unlock()
r.nodeInfos = nodeInfos
return nodeInfos, nil
}

// CleanUp cleans up the embedded processor.
func (r *TemplateNodeInfoRegistry) CleanUp() {
r.processor.CleanUp()
}

// GetNodeInfo returns the template NodeInfo for the given node group id.
func (r *TemplateNodeInfoRegistry) GetNodeInfo(id string) (*framework.NodeInfo, bool) {
r.lock.RLock()
defer r.lock.RUnlock()
nodeInfo, found := r.nodeInfos[id]
return nodeInfo, found
}

// GetNodeInfos returns a copy of the full map of template NodeInfos.
func (r *TemplateNodeInfoRegistry) GetNodeInfos() map[string]*framework.NodeInfo {
r.lock.RLock()
defer r.lock.RUnlock()
result := make(map[string]*framework.NodeInfo, len(r.nodeInfos))
maps.Copy(result, r.nodeInfos)
return result
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
Copyright 2025 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package nodeinfosprovider

import (
"testing"
"time"

appsv1 "k8s.io/api/apps/v1"
apiv1 "k8s.io/api/core/v1"

"github.com/stretchr/testify/assert"
ca_context "k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
"k8s.io/autoscaler/cluster-autoscaler/utils/taints"
)

type mockTemplateNodeInfoProvider struct {
nodeInfos map[string]*framework.NodeInfo
}

func (p *mockTemplateNodeInfoProvider) Process(autoscalingCtx *ca_context.AutoscalingContext, nodes []*apiv1.Node, daemonsets []*appsv1.DaemonSet, taintConfig taints.TaintConfig, currentTime time.Time) (map[string]*framework.NodeInfo, errors.AutoscalerError) {
return p.nodeInfos, nil
}

func (p *mockTemplateNodeInfoProvider) CleanUp() {}

func TestTemplateNodeInfoRegistry(t *testing.T) {
mockProvider := &mockTemplateNodeInfoProvider{
nodeInfos: map[string]*framework.NodeInfo{
"ng1": {},
},
}
registry := NewTemplateNodeInfoRegistry(mockProvider)

// Test Process
_, err := registry.Process(nil, nil, nil, taints.TaintConfig{}, time.Now())
assert.NoError(t, err)

// Test GetNodeInfo
info, found := registry.GetNodeInfo("ng1")
assert.True(t, found)
assert.NotNil(t, info)

info, found = registry.GetNodeInfo("ng2")
assert.False(t, found)
assert.Nil(t, info)

// Test GetNodeInfos
infos := registry.GetNodeInfos()
assert.Len(t, infos, 1)
assert.Contains(t, infos, "ng1")

// Test Update
mockProvider.nodeInfos = map[string]*framework.NodeInfo{
"ng1": {},
"ng2": {},
}
registry.Process(nil, nil, nil, taints.TaintConfig{}, time.Now())

info, found = registry.GetNodeInfo("ng2")
assert.True(t, found)
assert.NotNil(t, info)
}