@@ -103,6 +103,7 @@ func (m ClusterStateFeederFactory) Make() *clusterStateFeeder {
103103 vpaCheckpointClient : m .VpaCheckpointClient ,
104104 vpaCheckpointLister : m .VpaCheckpointLister ,
105105 vpaLister : m .VpaLister ,
106+ podLister : m .PodLister ,
106107 clusterState : m .ClusterState ,
107108 specClient : spec .NewSpecClient (m .PodLister ),
108109 selectorFetcher : m .SelectorFetcher ,
@@ -211,6 +212,7 @@ type clusterStateFeeder struct {
211212 vpaCheckpointClient vpa_api.VerticalPodAutoscalerCheckpointsGetter
212213 vpaCheckpointLister vpa_lister.VerticalPodAutoscalerCheckpointLister
213214 vpaLister vpa_lister.VerticalPodAutoscalerLister
215+ podLister v1lister.PodLister
214216 clusterState model.ClusterState
215217 selectorFetcher target.VpaTargetSelectorFetcher
216218 memorySaveMode bool
@@ -220,55 +222,6 @@ type clusterStateFeeder struct {
220222 vpaObjectNamespace string
221223}
222224
223- func (feeder * clusterStateFeeder ) InitFromHistoryProvider (historyProvider history.HistoryProvider ) {
224- klog .V (3 ).InfoS ("Initializing VPA from history provider" )
225- clusterHistory , err := historyProvider .GetClusterHistory ()
226- if err != nil {
227- klog .ErrorS (err , "Cannot get cluster history" )
228- }
229- for podID , podHistory := range clusterHistory {
230- klog .V (4 ).InfoS ("Adding pod with labels" , "pod" , podID , "labels" , podHistory .LastLabels )
231- feeder .clusterState .AddOrUpdatePod (podID , podHistory .LastLabels , apiv1 .PodUnknown )
232- for containerName , sampleList := range podHistory .Samples {
233- containerID := model.ContainerID {
234- PodID : podID ,
235- ContainerName : containerName ,
236- }
237- klog .V (0 ).InfoS ("Adding" , "container" , containerID )
238- // TODO @jklaw90: pass the container type here
239- if err = feeder .clusterState .AddOrUpdateContainer (containerID , nil , model .ContainerTypeStandard ); err != nil {
240- klog .V (0 ).InfoS ("Failed to add container" , "container" , containerID , "error" , err )
241- }
242- klog .V (4 ).InfoS ("Adding samples for container" , "sampleCount" , len (sampleList ), "container" , containerID )
243- for _ , sample := range sampleList {
244- if err := feeder .clusterState .AddSample (
245- & model.ContainerUsageSampleWithKey {
246- ContainerUsageSample : sample ,
247- Container : containerID ,
248- }); err != nil {
249- klog .V (0 ).InfoS ("Failed to add sample" , "sample" , sample , "error" , err )
250- }
251- }
252- }
253- }
254- }
255-
256- func (feeder * clusterStateFeeder ) setVpaCheckpoint (checkpoint * vpa_types.VerticalPodAutoscalerCheckpoint ) error {
257- vpaID := model.VpaID {Namespace : checkpoint .Namespace , VpaName : checkpoint .Spec .VPAObjectName }
258- vpa , exists := feeder .clusterState .VPAs ()[vpaID ]
259- if ! exists {
260- return fmt .Errorf ("cannot load checkpoint to missing VPA object %s/%s" , vpaID .Namespace , vpaID .VpaName )
261- }
262-
263- cs := model .NewAggregateContainerState ()
264- err := cs .LoadFromCheckpoint (& checkpoint .Status )
265- if err != nil {
266- return fmt .Errorf ("cannot load checkpoint for VPA %s/%s. Reason: %v" , vpaID .Namespace , vpaID .VpaName , err )
267- }
268- vpa .ContainersInitialAggregateState [checkpoint .Spec .ContainerName ] = cs
269- return nil
270- }
271-
272225func (feeder * clusterStateFeeder ) InitFromCheckpoints (ctx context.Context ) {
273226 klog .V (3 ).InfoS ("Initializing VPA from checkpoints" )
274227 feeder .LoadVPAs (ctx )
@@ -300,6 +253,49 @@ func (feeder *clusterStateFeeder) InitFromCheckpoints(ctx context.Context) {
300253 }
301254}
302255
256+ func (feeder * clusterStateFeeder ) InitFromHistoryProvider (historyProvider history.HistoryProvider ) {
257+ pods := feeder .podSpecLookup ()
258+ klog .V (3 ).InfoS ("Initializing VPA from history provider" )
259+ clusterHistory , err := historyProvider .GetClusterHistory ()
260+ if err != nil {
261+ klog .ErrorS (err , "Cannot get cluster history" )
262+ }
263+ for podID , podHistory := range clusterHistory {
264+ // no need to load history if the pod no longer exists
265+ podSpec , ok := pods [podID ]
266+ if ! ok {
267+ continue
268+ }
269+ klog .V (4 ).InfoS ("Adding pod with labels" , "pod" , podID , "labels" , podHistory .LastLabels )
270+ feeder .clusterState .AddOrUpdatePod (podID , podHistory .LastLabels , podSpec .Phase )
271+ for containerName , sampleList := range podHistory .Samples {
272+ containerID := model.ContainerID {
273+ PodID : podID ,
274+ ContainerName : containerName ,
275+ }
276+ klog .V (0 ).InfoS ("Adding" , "container" , containerID )
277+
278+ containerSpec := podSpec .GetContainerSpec (containerName )
279+ if containerSpec == nil {
280+ continue
281+ }
282+ if err = feeder .clusterState .AddOrUpdateContainer (containerID , nil , containerSpec .ContainerType ); err != nil {
283+ klog .V (0 ).InfoS ("Failed to add container" , "container" , containerID , "error" , err )
284+ }
285+ klog .V (4 ).InfoS ("Adding samples for container" , "sampleCount" , len (sampleList ), "container" , containerID )
286+ for _ , sample := range sampleList {
287+ if err := feeder .clusterState .AddSample (
288+ & model.ContainerUsageSampleWithKey {
289+ ContainerUsageSample : sample ,
290+ Container : containerID ,
291+ }); err != nil {
292+ klog .V (0 ).InfoS ("Failed to add sample" , "sample" , sample , "error" , err )
293+ }
294+ }
295+ }
296+ }
297+ }
298+
303299func (feeder * clusterStateFeeder ) GarbageCollectCheckpoints (ctx context.Context ) {
304300 klog .V (3 ).InfoS ("Starting garbage collection of checkpoints" )
305301
@@ -341,82 +337,6 @@ func (feeder *clusterStateFeeder) GarbageCollectCheckpoints(ctx context.Context)
341337 }
342338}
343339
344- func (feeder * clusterStateFeeder ) shouldIgnoreNamespace (namespace string ) bool {
345- // 1. `vpaObjectNamespace` is set but doesn't match the current namespace.
346- if feeder .vpaObjectNamespace != "" && namespace != feeder .vpaObjectNamespace {
347- return true
348- }
349- // 2. `ignoredNamespaces` is set, and the current namespace is in the list.
350- if len (feeder .ignoredNamespaces ) > 0 && slices .Contains (feeder .ignoredNamespaces , namespace ) {
351- return true
352- }
353- return false
354- }
355-
356- func (feeder * clusterStateFeeder ) cleanupCheckpointsForNamespace (ctx context.Context , namespace string , allVPAKeys map [model.VpaID ]bool ) error {
357- var err error
358- checkpointList , err := feeder .vpaCheckpointLister .VerticalPodAutoscalerCheckpoints (namespace ).List (labels .Everything ())
359-
360- if err != nil {
361- return err
362- }
363- for _ , checkpoint := range checkpointList {
364- vpaID := model.VpaID {Namespace : checkpoint .Namespace , VpaName : checkpoint .Spec .VPAObjectName }
365- if ! allVPAKeys [vpaID ] {
366- if errFeeder := feeder .vpaCheckpointClient .VerticalPodAutoscalerCheckpoints (namespace ).Delete (ctx , checkpoint .Name , metav1.DeleteOptions {}); errFeeder != nil {
367- err = fmt .Errorf ("failed to delete orphaned checkpoint %s: %w" , klog .KRef (namespace , checkpoint .Name ), err )
368- continue
369- }
370- klog .V (3 ).InfoS ("Orphaned VPA checkpoint cleanup - deleting" , "checkpoint" , klog .KRef (namespace , checkpoint .Name ))
371- }
372- }
373- return err
374- }
375-
376- func implicitDefaultRecommender (selectors []* vpa_types.VerticalPodAutoscalerRecommenderSelector ) bool {
377- return len (selectors ) == 0
378- }
379-
380- func selectsRecommender (selectors []* vpa_types.VerticalPodAutoscalerRecommenderSelector , name * string ) bool {
381- for _ , s := range selectors {
382- if s .Name == * name {
383- return true
384- }
385- }
386- return false
387- }
388-
389- // Filter VPA objects whose specified recommender names are not default
390- func filterVPAs (feeder * clusterStateFeeder , allVpaCRDs []* vpa_types.VerticalPodAutoscaler ) []* vpa_types.VerticalPodAutoscaler {
391- klog .V (3 ).InfoS ("Start selecting the vpaCRDs." )
392- var vpaCRDs []* vpa_types.VerticalPodAutoscaler
393- for _ , vpaCRD := range allVpaCRDs {
394- if feeder .recommenderName == DefaultRecommenderName {
395- if ! implicitDefaultRecommender (vpaCRD .Spec .Recommenders ) && ! selectsRecommender (vpaCRD .Spec .Recommenders , & feeder .recommenderName ) {
396- klog .V (6 ).InfoS ("Ignoring vpaCRD as current recommender's name doesn't appear among its recommenders" , "vpaCRD" , klog .KObj (vpaCRD ), "recommenderName" , feeder .recommenderName )
397- continue
398- }
399- } else {
400- if implicitDefaultRecommender (vpaCRD .Spec .Recommenders ) {
401- klog .V (6 ).InfoS ("Ignoring vpaCRD as recommender doesn't process CRDs implicitly destined to default recommender" , "vpaCRD" , klog .KObj (vpaCRD ), "recommenderName" , feeder .recommenderName , "defaultRecommenderName" , DefaultRecommenderName )
402- continue
403- }
404- if ! selectsRecommender (vpaCRD .Spec .Recommenders , & feeder .recommenderName ) {
405- klog .V (6 ).InfoS ("Ignoring vpaCRD as current recommender's name doesn't appear among its recommenders" , "vpaCRD" , klog .KObj (vpaCRD ), "recommenderName" , feeder .recommenderName )
406- continue
407- }
408- }
409-
410- if feeder .shouldIgnoreNamespace (vpaCRD .Namespace ) {
411- klog .V (6 ).InfoS ("Ignoring vpaCRD as this namespace is ignored" , "vpaCRD" , klog .KObj (vpaCRD ))
412- continue
413- }
414-
415- vpaCRDs = append (vpaCRDs , vpaCRD )
416- }
417- return vpaCRDs
418- }
419-
420340// LoadVPAs fetches VPA objects and loads them into the cluster state.
421341func (feeder * clusterStateFeeder ) LoadVPAs (ctx context.Context ) {
422342 // List VPA API objects.
@@ -468,14 +388,7 @@ func (feeder *clusterStateFeeder) LoadVPAs(ctx context.Context) {
468388
469389// LoadPods loads pod into the cluster state.
470390func (feeder * clusterStateFeeder ) LoadPods () {
471- podSpecs , err := feeder .specClient .GetPodSpecs ()
472- if err != nil {
473- klog .ErrorS (err , "Cannot get SimplePodSpecs" )
474- }
475- pods := make (map [model.PodID ]* spec.BasicPodSpec )
476- for _ , spec := range podSpecs {
477- pods [spec .ID ] = spec
478- }
391+ pods := feeder .podSpecLookup ()
479392 for key := range feeder .clusterState .Pods () {
480393 if _ , exists := pods [key ]; ! exists {
481394 klog .V (3 ).InfoS ("Deleting Pod" , "pod" , klog .KRef (key .Namespace , key .PodName ))
@@ -488,13 +401,13 @@ func (feeder *clusterStateFeeder) LoadPods() {
488401 }
489402 feeder .clusterState .AddOrUpdatePod (pod .ID , pod .PodLabels , pod .Phase )
490403 for _ , container := range pod .Containers {
491- if err = feeder .clusterState .AddOrUpdateContainer (container .ID , container .Request , container .ContainerType ); err != nil {
404+ if err : = feeder .clusterState .AddOrUpdateContainer (container .ID , container .Request , container .ContainerType ); err != nil {
492405 klog .V (0 ).InfoS ("Failed to add container" , "container" , container .ID , "error" , err )
493406 }
494407 }
495408 for _ , initContainer := range pod .InitContainers {
496409 if features .Enabled (features .NativeSidecar ) && initContainer .ContainerType == model .ContainerTypeInitSidecar {
497- if err = feeder .clusterState .AddOrUpdateContainer (initContainer .ID , initContainer .Request , initContainer .ContainerType ); err != nil {
410+ if err : = feeder .clusterState .AddOrUpdateContainer (initContainer .ID , initContainer .Request , initContainer .ContainerType ); err != nil {
498411 klog .V (0 ).InfoS ("Failed to add initContainer" , "container" , initContainer .ID , "error" , err )
499412 }
500413 } else {
@@ -551,6 +464,109 @@ Loop:
551464 metrics_recommender .RecordAggregateContainerStatesCount (feeder .clusterState .StateMapSize ())
552465}
553466
467+ func (feeder * clusterStateFeeder ) podSpecLookup () map [model.PodID ]* spec.BasicPodSpec {
468+ podSpecs , err := feeder .specClient .GetPodSpecs ()
469+ if err != nil {
470+ klog .ErrorS (err , "Cannot get SimplePodSpecs" )
471+ }
472+ pods := make (map [model.PodID ]* spec.BasicPodSpec )
473+ for _ , spec := range podSpecs {
474+ pods [spec .ID ] = spec
475+ }
476+ return pods
477+ }
478+
479+ func (feeder * clusterStateFeeder ) setVpaCheckpoint (checkpoint * vpa_types.VerticalPodAutoscalerCheckpoint ) error {
480+ vpaID := model.VpaID {Namespace : checkpoint .Namespace , VpaName : checkpoint .Spec .VPAObjectName }
481+ vpa , exists := feeder .clusterState .VPAs ()[vpaID ]
482+ if ! exists {
483+ return fmt .Errorf ("cannot load checkpoint to missing VPA object %s/%s" , vpaID .Namespace , vpaID .VpaName )
484+ }
485+
486+ cs := model .NewAggregateContainerState ()
487+ err := cs .LoadFromCheckpoint (& checkpoint .Status )
488+ if err != nil {
489+ return fmt .Errorf ("cannot load checkpoint for VPA %s/%s. Reason: %v" , vpaID .Namespace , vpaID .VpaName , err )
490+ }
491+ vpa .ContainersInitialAggregateState [checkpoint .Spec .ContainerName ] = cs
492+ return nil
493+ }
494+ func (feeder * clusterStateFeeder ) shouldIgnoreNamespace (namespace string ) bool {
495+ // 1. `vpaObjectNamespace` is set but doesn't match the current namespace.
496+ if feeder .vpaObjectNamespace != "" && namespace != feeder .vpaObjectNamespace {
497+ return true
498+ }
499+ // 2. `ignoredNamespaces` is set, and the current namespace is in the list.
500+ if len (feeder .ignoredNamespaces ) > 0 && slices .Contains (feeder .ignoredNamespaces , namespace ) {
501+ return true
502+ }
503+ return false
504+ }
505+
506+ func (feeder * clusterStateFeeder ) cleanupCheckpointsForNamespace (ctx context.Context , namespace string , allVPAKeys map [model.VpaID ]bool ) error {
507+ var err error
508+ checkpointList , err := feeder .vpaCheckpointLister .VerticalPodAutoscalerCheckpoints (namespace ).List (labels .Everything ())
509+
510+ if err != nil {
511+ return err
512+ }
513+ for _ , checkpoint := range checkpointList {
514+ vpaID := model.VpaID {Namespace : checkpoint .Namespace , VpaName : checkpoint .Spec .VPAObjectName }
515+ if ! allVPAKeys [vpaID ] {
516+ if errFeeder := feeder .vpaCheckpointClient .VerticalPodAutoscalerCheckpoints (namespace ).Delete (ctx , checkpoint .Name , metav1.DeleteOptions {}); errFeeder != nil {
517+ err = fmt .Errorf ("failed to delete orphaned checkpoint %s: %w" , klog .KRef (namespace , checkpoint .Name ), err )
518+ continue
519+ }
520+ klog .V (3 ).InfoS ("Orphaned VPA checkpoint cleanup - deleting" , "checkpoint" , klog .KRef (namespace , checkpoint .Name ))
521+ }
522+ }
523+ return err
524+ }
525+
526+ func implicitDefaultRecommender (selectors []* vpa_types.VerticalPodAutoscalerRecommenderSelector ) bool {
527+ return len (selectors ) == 0
528+ }
529+
530+ func selectsRecommender (selectors []* vpa_types.VerticalPodAutoscalerRecommenderSelector , name * string ) bool {
531+ for _ , s := range selectors {
532+ if s .Name == * name {
533+ return true
534+ }
535+ }
536+ return false
537+ }
538+
539+ // Filter VPA objects whose specified recommender names are not default
540+ func filterVPAs (feeder * clusterStateFeeder , allVpaCRDs []* vpa_types.VerticalPodAutoscaler ) []* vpa_types.VerticalPodAutoscaler {
541+ klog .V (3 ).InfoS ("Start selecting the vpaCRDs." )
542+ var vpaCRDs []* vpa_types.VerticalPodAutoscaler
543+ for _ , vpaCRD := range allVpaCRDs {
544+ if feeder .recommenderName == DefaultRecommenderName {
545+ if ! implicitDefaultRecommender (vpaCRD .Spec .Recommenders ) && ! selectsRecommender (vpaCRD .Spec .Recommenders , & feeder .recommenderName ) {
546+ klog .V (6 ).InfoS ("Ignoring vpaCRD as current recommender's name doesn't appear among its recommenders" , "vpaCRD" , klog .KObj (vpaCRD ), "recommenderName" , feeder .recommenderName )
547+ continue
548+ }
549+ } else {
550+ if implicitDefaultRecommender (vpaCRD .Spec .Recommenders ) {
551+ klog .V (6 ).InfoS ("Ignoring vpaCRD as recommender doesn't process CRDs implicitly destined to default recommender" , "vpaCRD" , klog .KObj (vpaCRD ), "recommenderName" , feeder .recommenderName , "defaultRecommenderName" , DefaultRecommenderName )
552+ continue
553+ }
554+ if ! selectsRecommender (vpaCRD .Spec .Recommenders , & feeder .recommenderName ) {
555+ klog .V (6 ).InfoS ("Ignoring vpaCRD as current recommender's name doesn't appear among its recommenders" , "vpaCRD" , klog .KObj (vpaCRD ), "recommenderName" , feeder .recommenderName )
556+ continue
557+ }
558+ }
559+
560+ if feeder .shouldIgnoreNamespace (vpaCRD .Namespace ) {
561+ klog .V (6 ).InfoS ("Ignoring vpaCRD as this namespace is ignored" , "vpaCRD" , klog .KObj (vpaCRD ))
562+ continue
563+ }
564+
565+ vpaCRDs = append (vpaCRDs , vpaCRD )
566+ }
567+ return vpaCRDs
568+ }
569+
554570func (feeder * clusterStateFeeder ) matchesVPA (pod * spec.BasicPodSpec ) bool {
555571 for vpaKey , vpa := range feeder .clusterState .VPAs () {
556572 podLabels := labels .Set (pod .PodLabels )
0 commit comments