Skip to content

Commit c640996

Browse files
committed
feat: add pod-label-selectors flag to vpa updater
1 parent 6177945 commit c640996

File tree

6 files changed

+372
-4
lines changed

6 files changed

+372
-4
lines changed

vertical-pod-autoscaler/docs/flags.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,7 @@ This document is auto-generated from the flag definitions in the VPA updater cod
164164
| `logtostderr` | | true | log to standard error instead of files |
165165
| `min-replicas` | int | 2 | Minimum number of replicas to perform update |
166166
| `one-output` | severity | | If true, only write logs to their native level (vs also writing to each lower severity level; no effect when -logtostderr=true) |
167+
| `pod-label-selectors` | string | | If present, the updater will only process pods matching the given label selectors. |
167168
| `pod-update-threshold` | float | 0.1 | Ignore updates that have priority lower than the value of this flag |
168169
| `profiling` | int | | Is debug/pprof endpoenabled |
169170
| `skip-headers` | | | If true, avoid header prefixes in the log messages |

vertical-pod-autoscaler/e2e/utils/common.go

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@ const (
4646
RecommenderDeploymentName = "vpa-recommender"
4747
// RecommenderNamespace is namespace to deploy VPA recommender
4848
RecommenderNamespace = "kube-system"
49+
// UpdaterNamespace is namespace to deploy VPA updater
50+
UpdaterNamespace = "kube-system"
4951
// PollInterval is interval for polling
5052
PollInterval = 10 * time.Second
5153
// PollTimeout is timeout for polling
@@ -67,6 +69,9 @@ var HamsterTargetRef = &autoscaling.CrossVersionObjectReference{
6769
// RecommenderLabels are labels of VPA recommender
6870
var RecommenderLabels = map[string]string{"app": "vpa-recommender"}
6971

72+
// CustomUpdaterLabels are labels of VPA updater
73+
var CustomUpdaterLabels = map[string]string{"app": "custom-vpa-updater"}
74+
7075
// HamsterLabels are labels of hamster app
7176
var HamsterLabels = map[string]string{"app": "hamster"}
7277

@@ -215,6 +220,73 @@ func NewVPADeployment(f *framework.Framework, flags []string) *appsv1.Deployment
215220
return d
216221
}
217222

223+
// NewUpdaterDeployment creates a new updater deployment for e2e test purposes
224+
func NewUpdaterDeployment(f *framework.Framework, deploymentName string, flags []string) *appsv1.Deployment {
225+
d := framework_deployment.NewDeployment(
226+
deploymentName, /*deploymentName*/
227+
1, /*replicas*/
228+
CustomUpdaterLabels, /*podLabels*/
229+
"updater", /*imageName*/
230+
"localhost:5001/vpa-updater", /*image*/
231+
appsv1.RollingUpdateDeploymentStrategyType, /*strategyType*/
232+
)
233+
d.ObjectMeta.Namespace = f.Namespace.Name
234+
d.Spec.Template.Spec.Containers[0].ImagePullPolicy = apiv1.PullNever // Image must be loaded first
235+
d.Spec.Template.Spec.ServiceAccountName = "vpa-updater"
236+
d.Spec.Template.Spec.Containers[0].Command = []string{"/updater"}
237+
d.Spec.Template.Spec.Containers[0].Args = flags
238+
239+
runAsNonRoot := true
240+
var runAsUser int64 = 65534 // nobody
241+
d.Spec.Template.Spec.SecurityContext = &apiv1.PodSecurityContext{
242+
RunAsNonRoot: &runAsNonRoot,
243+
RunAsUser: &runAsUser,
244+
}
245+
246+
// Same as deploy/updater-deployment.yaml
247+
d.Spec.Template.Spec.Containers[0].Resources = apiv1.ResourceRequirements{
248+
Limits: apiv1.ResourceList{
249+
apiv1.ResourceCPU: resource.MustParse("200m"),
250+
apiv1.ResourceMemory: resource.MustParse("1000Mi"),
251+
},
252+
Requests: apiv1.ResourceList{
253+
apiv1.ResourceCPU: resource.MustParse("50m"),
254+
apiv1.ResourceMemory: resource.MustParse("500Mi"),
255+
},
256+
}
257+
258+
d.Spec.Template.Spec.Containers[0].Ports = []apiv1.ContainerPort{{
259+
Name: "prometheus",
260+
ContainerPort: 8943,
261+
}}
262+
263+
d.Spec.Template.Spec.Containers[0].LivenessProbe = &apiv1.Probe{
264+
ProbeHandler: apiv1.ProbeHandler{
265+
HTTPGet: &apiv1.HTTPGetAction{
266+
Path: "/health-check",
267+
Port: intstr.FromString("prometheus"),
268+
Scheme: apiv1.URISchemeHTTP,
269+
},
270+
},
271+
InitialDelaySeconds: 5,
272+
PeriodSeconds: 10,
273+
FailureThreshold: 3,
274+
}
275+
d.Spec.Template.Spec.Containers[0].ReadinessProbe = &apiv1.Probe{
276+
ProbeHandler: apiv1.ProbeHandler{
277+
HTTPGet: &apiv1.HTTPGetAction{
278+
Path: "/health-check",
279+
Port: intstr.FromString("prometheus"),
280+
Scheme: apiv1.URISchemeHTTP,
281+
},
282+
},
283+
PeriodSeconds: 10,
284+
FailureThreshold: 3,
285+
}
286+
287+
return d
288+
}
289+
218290
// NewNHamstersDeployment creates a simple hamster deployment with n containers
219291
// for e2e test purposes.
220292
func NewNHamstersDeployment(f *framework.Framework, n int) *appsv1.Deployment {

vertical-pod-autoscaler/e2e/v1/updater.go

Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ package autoscaling
1919
import (
2020
"context"
2121
"fmt"
22+
"strconv"
23+
"strings"
2224
"time"
2325

2426
autoscaling "k8s.io/api/autoscaling/v1"
@@ -30,6 +32,7 @@ import (
3032
"k8s.io/autoscaler/vertical-pod-autoscaler/pkg/utils/test"
3133
"k8s.io/kubernetes/test/e2e/framework"
3234
podsecurity "k8s.io/pod-security-admission/api"
35+
"k8s.io/utils/ptr"
3336

3437
ginkgo "github.com/onsi/ginkgo/v2"
3538
"github.com/onsi/gomega"
@@ -205,8 +208,156 @@ var _ = UpdaterE2eDescribe("Updater", func() {
205208
err := WaitForPodsUpdatedWithoutEviction(f, initialPods)
206209
gomega.Expect(err).NotTo(gomega.HaveOccurred())
207210
})
211+
212+
framework.It("filters pods using the --pod-label-selectors flag", framework.WithSerial(), func() {
213+
const (
214+
testLabelKey = "vpa-updater-test"
215+
testLabelValueMatch = "enabled"
216+
matchingReplicas = 3
217+
nonMatchingReplicas = 2
218+
)
219+
testNamespace := f.Namespace.Name
220+
221+
ginkgo.By("Creating pods with non-matching labels")
222+
nonMatchingDeployment := utils.NewNHamstersDeployment(f, 1)
223+
nonMatchingDeployment.Name = "non-matching-hamster"
224+
nonMatchingDeployment.Spec.Replicas = ptr.To(int32(nonMatchingReplicas))
225+
nonMatchingDeployment.Spec.Template.Labels[testLabelKey] = "disabled"
226+
nonMatchingDeployment.Spec.Template.Labels["app"] = "non-matching"
227+
nonMatchingDeployment.Spec.Selector.MatchLabels[testLabelKey] = "disabled"
228+
nonMatchingDeployment.Spec.Selector.MatchLabels["app"] = "non-matching"
229+
utils.StartDeploymentPods(f, nonMatchingDeployment)
230+
231+
ginkgo.By("Creating pods with matching labels")
232+
matchingDeployment := utils.NewNHamstersDeployment(f, 1)
233+
matchingDeployment.Name = "matching-hamster"
234+
matchingDeployment.Spec.Replicas = ptr.To(int32(matchingReplicas))
235+
matchingDeployment.Spec.Template.Labels[testLabelKey] = testLabelValueMatch
236+
matchingDeployment.Spec.Template.Labels["app"] = "matching"
237+
matchingDeployment.Spec.Selector.MatchLabels[testLabelKey] = testLabelValueMatch
238+
matchingDeployment.Spec.Selector.MatchLabels["app"] = "matching"
239+
utils.StartDeploymentPods(f, matchingDeployment)
240+
241+
ginkgo.By("Creating VPAs for both deployments")
242+
containerName := utils.GetHamsterContainerNameByIndex(0)
243+
nonMatchingVPA := test.VerticalPodAutoscaler().
244+
WithName("non-matching-vpa").
245+
WithNamespace(testNamespace).
246+
WithTargetRef(&autoscaling.CrossVersionObjectReference{
247+
APIVersion: "apps/v1",
248+
Kind: "Deployment",
249+
Name: nonMatchingDeployment.Name,
250+
}).
251+
WithContainer(containerName).
252+
WithUpdateMode(vpa_types.UpdateModeRecreate).
253+
Get()
254+
utils.InstallVPA(f, nonMatchingVPA)
255+
256+
matchingVPA := test.VerticalPodAutoscaler().
257+
WithName("matching-vpa").
258+
WithNamespace(testNamespace).
259+
WithTargetRef(&autoscaling.CrossVersionObjectReference{
260+
APIVersion: "apps/v1",
261+
Kind: "Deployment",
262+
Name: matchingDeployment.Name,
263+
}).
264+
WithContainer(containerName).
265+
WithUpdateMode(vpa_types.UpdateModeRecreate).
266+
Get()
267+
utils.InstallVPA(f, matchingVPA)
268+
269+
ginkgo.By("Setting up custom updater deployment with --pod-label-selectors flag")
270+
// we swap the namespace to kube-system and then back to the test namespace
271+
// so our custom updater deployment can use the deployed RBAC
272+
originalNamespace := f.Namespace.Name
273+
f.Namespace.Name = utils.UpdaterNamespace
274+
deploymentName := "vpa-updater-with-pod-label-selectors"
275+
updaterDeployment := utils.NewUpdaterDeployment(f, deploymentName, []string{
276+
"--updater-interval=10s",
277+
"--use-admission-controller-status=false",
278+
fmt.Sprintf("--pod-label-selectors=%s=%s", testLabelKey, testLabelValueMatch),
279+
fmt.Sprintf("--vpa-object-namespace=%s", testNamespace),
280+
})
281+
utils.StartDeploymentPods(f, updaterDeployment)
282+
f.Namespace.Name = originalNamespace
283+
284+
defer func() {
285+
ginkgo.By("Cleaning up custom updater deployment")
286+
f.ClientSet.AppsV1().Deployments(utils.UpdaterNamespace).Delete(context.TODO(), deploymentName, metav1.DeleteOptions{})
287+
}()
288+
289+
ginkgo.By("Waiting for custom updater to report controlled pods count via metrics")
290+
gomega.Eventually(func() (float64, error) {
291+
return getMetricValue(f, utils.UpdaterNamespace, "vpa_updater_controlled_pods_total", map[string]string{
292+
"update_mode": string(vpa_types.UpdateModeRecreate),
293+
})
294+
}, 2*time.Minute, 5*time.Second).Should(gomega.Equal(float64(matchingReplicas)),
295+
"Custom updater should only see %d matching pods (not the %d non-matching pods)",
296+
matchingReplicas, nonMatchingReplicas)
297+
})
208298
})
209299

300+
func getMetricValue(f *framework.Framework, namespace, metricName string, labels map[string]string) (float64, error) {
301+
// Port forward to the updater pod
302+
pods, err := f.ClientSet.CoreV1().Pods(namespace).List(context.TODO(), metav1.ListOptions{
303+
LabelSelector: "app=custom-vpa-updater",
304+
})
305+
if err != nil || len(pods.Items) == 0 {
306+
return 0, fmt.Errorf("updater pod not found: %v", err)
307+
}
308+
309+
// Use kubectl port-forward via exec in the pod
310+
req := f.ClientSet.CoreV1().RESTClient().Get().
311+
Namespace(namespace).
312+
Resource("pods").
313+
Name(pods.Items[0].Name).
314+
SubResource("proxy").
315+
Suffix("metrics")
316+
317+
result := req.Do(context.TODO())
318+
body, err := result.Raw()
319+
if err != nil {
320+
return 0, fmt.Errorf("failed to get metrics: %v", err)
321+
}
322+
323+
// Parse Prometheus metrics format
324+
lines := strings.Split(string(body), "\n")
325+
for _, line := range lines {
326+
if strings.HasPrefix(line, "#") {
327+
continue
328+
}
329+
if !strings.HasPrefix(line, metricName) {
330+
continue
331+
}
332+
333+
// Match labels
334+
if len(labels) > 0 {
335+
allLabelsMatch := true
336+
for k, v := range labels {
337+
labelPattern := fmt.Sprintf(`%s="%s"`, k, v)
338+
if !strings.Contains(line, labelPattern) {
339+
allLabelsMatch = false
340+
break
341+
}
342+
}
343+
if !allLabelsMatch {
344+
continue
345+
}
346+
}
347+
348+
// Extract value from end of line
349+
parts := strings.Fields(line)
350+
if len(parts) >= 2 {
351+
value, err := strconv.ParseFloat(parts[len(parts)-1], 64)
352+
if err == nil {
353+
return value, nil
354+
}
355+
}
356+
}
357+
358+
return 0, fmt.Errorf("metric %s not found", metricName)
359+
}
360+
210361
func setupPodsForUpscalingEviction(f *framework.Framework) *apiv1.PodList {
211362
return setupPodsForEviction(f, "100m", "100Mi", nil)
212363
}

vertical-pod-autoscaler/pkg/updater/logic/updater.go

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,11 @@ import (
2424

2525
"golang.org/x/time/rate"
2626
apiv1 "k8s.io/api/core/v1"
27+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2728
"k8s.io/apimachinery/pkg/fields"
2829
"k8s.io/apimachinery/pkg/labels"
30+
"k8s.io/apimachinery/pkg/runtime"
31+
"k8s.io/apimachinery/pkg/watch"
2932
kube_client "k8s.io/client-go/kubernetes"
3033
"k8s.io/client-go/kubernetes/fake"
3134
corescheme "k8s.io/client-go/kubernetes/scheme"
@@ -102,6 +105,7 @@ func NewUpdater(
102105
namespace string,
103106
ignoredNamespaces []string,
104107
patchCalculators []patch.Calculator,
108+
podLabelSelector labels.Selector,
105109
) (Updater, error) {
106110
evictionRateLimiter := getRateLimiter(evictionRateLimit, evictionRateBurst)
107111
// TODO: Create in-place rate limits for the in-place rate limiter
@@ -118,7 +122,7 @@ func NewUpdater(
118122

119123
return &updater{
120124
vpaLister: vpa_api_util.NewVpasLister(vpaClient, make(chan struct{}), namespace),
121-
podLister: newPodLister(kubeClient, namespace),
125+
podLister: newPodLister(kubeClient, namespace, podLabelSelector),
122126
eventRecorder: newEventRecorder(kubeClient),
123127
restrictionFactory: factory,
124128
recommendationProcessor: recommendationProcessor,
@@ -397,10 +401,30 @@ func filterDeletedPods(pods []*apiv1.Pod) []*apiv1.Pod {
397401
})
398402
}
399403

400-
func newPodLister(kubeClient kube_client.Interface, namespace string) v1lister.PodLister {
401-
selector := fields.ParseSelectorOrDie("spec.nodeName!=" + "" + ",status.phase!=" +
404+
func newPodLister(kubeClient kube_client.Interface, namespace string, labelSelector labels.Selector) v1lister.PodLister {
405+
fieldSelector := fields.ParseSelectorOrDie("spec.nodeName!=" + "" + ",status.phase!=" +
402406
string(apiv1.PodSucceeded) + ",status.phase!=" + string(apiv1.PodFailed))
403-
podListWatch := cache.NewListWatchFromClient(kubeClient.CoreV1().RESTClient(), "pods", namespace, selector)
407+
408+
listFunc := func(options metav1.ListOptions) (runtime.Object, error) {
409+
options.FieldSelector = fieldSelector.String()
410+
if labelSelector != nil && !labelSelector.Empty() {
411+
options.LabelSelector = labelSelector.String()
412+
}
413+
return kubeClient.CoreV1().Pods(namespace).List(context.TODO(), options)
414+
}
415+
watchFunc := func(options metav1.ListOptions) (watch.Interface, error) {
416+
options.FieldSelector = fieldSelector.String()
417+
if labelSelector != nil && !labelSelector.Empty() {
418+
options.LabelSelector = labelSelector.String()
419+
}
420+
return kubeClient.CoreV1().Pods(namespace).Watch(context.TODO(), options)
421+
}
422+
423+
podListWatch := &cache.ListWatch{
424+
ListFunc: listFunc,
425+
WatchFunc: watchFunc,
426+
}
427+
404428
store := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
405429
podLister := v1lister.NewPodLister(store)
406430
podReflector := cache.NewReflector(podListWatch, &apiv1.Pod{}, store, time.Hour)

0 commit comments

Comments
 (0)