From 68754e7eb444bcdc8907777ce793c4849172a31e Mon Sep 17 00:00:00 2001 From: Sebastian Sch Date: Sun, 9 Feb 2025 20:47:43 +0200 Subject: [PATCH] Support draining daemonset pods that use sriov devices with this commit we also take care of removing daemonset owned pods using sriov devices. we only do it when drain is requested we don't do it for reboot requests Signed-off-by: Sebastian Sch --- controllers/drain_controller_test.go | 2 +- pkg/drain/drainer.go | 106 ++++++++--- .../tests/test_policy_configuration.go | 1 + test/conformance/tests/test_sriov_operator.go | 178 +++++++++++++++++- 4 files changed, 261 insertions(+), 26 deletions(-) diff --git a/controllers/drain_controller_test.go b/controllers/drain_controller_test.go index 3ecffa81c..b1256561b 100644 --- a/controllers/drain_controller_test.go +++ b/controllers/drain_controller_test.go @@ -326,7 +326,7 @@ func expectNodeStateAnnotation(nodeState *sriovnetworkv1.SriovNetworkNodeState, g.Expect(utils.ObjectHasAnnotation(nodeState, constants.NodeStateDrainAnnotationCurrent, expectedAnnotationValue)). To(BeTrue(), "Node[%s] annotation[%s] == '%s'. Expected '%s'", nodeState.Name, constants.NodeDrainAnnotation, nodeState.GetLabels()[constants.NodeStateDrainAnnotationCurrent], expectedAnnotationValue) - }, "20s", "1s").Should(Succeed()) + }, "200s", "1s").Should(Succeed()) } func expectNumberOfDrainingNodes(numbOfDrain int, nodesState ...*sriovnetworkv1.SriovNetworkNodeState) { diff --git a/pkg/drain/drainer.go b/pkg/drain/drainer.go index 22dbed3df..2fb24a74f 100644 --- a/pkg/drain/drainer.go +++ b/pkg/drain/drainer.go @@ -7,6 +7,7 @@ import ( "time" corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" "k8s.io/kubectl/pkg/drain" @@ -68,16 +69,15 @@ func (d *Drainer) DrainNode(ctx context.Context, node *corev1.Node, fullNodeDrai return false, nil } - drainHelper := createDrainHelper(d.kubeClient, ctx, fullNodeDrain) backoff := wait.Backoff{ Steps: 5, - Duration: 10 * time.Second, + Duration: 5 * time.Second, Factor: 2, } var lastErr error - reqLogger.Info("drainNode(): Start draining") - if err = wait.ExponentialBackoff(backoff, func() (bool, error) { + if err = wait.ExponentialBackoffWithContext(ctx, backoff, func(ctx context.Context) (bool, error) { + drainHelper := createDrainHelper(d.kubeClient, ctx, fullNodeDrain) err := drain.RunCordonOrUncordon(drainHelper, node, true) if err != nil { lastErr = err @@ -85,12 +85,19 @@ func (d *Drainer) DrainNode(ctx context.Context, node *corev1.Node, fullNodeDrai return false, nil } err = drain.RunNodeDrain(drainHelper, node.Name) - if err == nil { - return true, nil + if err != nil { + lastErr = err + reqLogger.Info("drainNode(): Draining failed, retrying", "error", err) + return false, nil } - lastErr = err - reqLogger.Info("drainNode(): Draining failed, retrying", "error", err) - return false, nil + + err = d.removeDaemonSetsFromNode(ctx, node.Name) + if err != nil { + lastErr = err + return false, nil + } + + return true, nil }); err != nil { if wait.Interrupted(err) { reqLogger.Info("drainNode(): failed to drain node", "steps", backoff.Steps, "error", lastErr) @@ -131,6 +138,28 @@ func (d *Drainer) CompleteDrainNode(ctx context.Context, node *corev1.Node) (boo return completed, nil } +// removeDaemonSetsFromNode go over all the remain pods and search for DaemonSets that have SR-IOV devices to remove them +// we can't use the drain from core kubernetes as it doesn't support removing pods that are part of a DaemonSets +func (d *Drainer) removeDaemonSetsFromNode(ctx context.Context, nodeName string) error { + reqLogger := log.FromContext(ctx) + reqLogger.Info("drainNode(): remove DaemonSets using sriov devices from node", "nodeName", nodeName) + + podList, err := d.kubeClient.CoreV1().Pods("").List(ctx, metav1.ListOptions{FieldSelector: fmt.Sprintf("spec.nodeName=%s", nodeName)}) + if err != nil { + reqLogger.Info("drainNode(): Failed to list pods, retrying", "error", err) + return err + } + + // remove pods that are owned by a DaemonSet and use SR-IOV devices + dsPodsList := getDsPodsToRemove(podList) + drainHelper := createDrainHelper(d.kubeClient, ctx, true) + err = drainHelper.DeleteOrEvictPods(dsPodsList) + if err != nil { + reqLogger.Error(err, "failed to delete or evict pods from node", "nodeName", nodeName) + } + return err +} + // createDrainHelper function to create a drain helper // if fullDrain is false we only remove pods that have the resourcePrefix // if not we remove all the pods in the node @@ -150,25 +179,21 @@ func createDrainHelper(kubeClient kubernetes.Interface, ctx context.Context, ful } log.Log.Info(fmt.Sprintf("%s pod from Node %s/%s", verbStr, pod.Namespace, pod.Name)) }, - Ctx: ctx, - Out: writer{logger.Info}, - ErrOut: writer{func(msg string, kv ...interface{}) { logger.Error(nil, msg, kv...) }}, + Ctx: ctx, + Out: writer{logger.Info}, + ErrOut: writer{func(msg string, kv ...interface{}) { + logger.Error(nil, msg, kv...) + }}, } // when we just want to drain and not reboot we can only remove the pods using sriov devices if !fullDrain { deleteFunction := func(p corev1.Pod) drain.PodDeleteStatus { - for _, c := range p.Spec.Containers { - if c.Resources.Requests != nil { - for r := range c.Resources.Requests { - if strings.HasPrefix(r.String(), vars.ResourcePrefix) { - return drain.PodDeleteStatus{ - Delete: true, - Reason: "pod contain SR-IOV device", - Message: "SR-IOV network operator draining the node", - } - } - } + if podHasSRIOVDevice(&p) { + return drain.PodDeleteStatus{ + Delete: true, + Reason: "pod contains SR-IOV device", + Message: "SR-IOV network operator draining the node", } } return drain.PodDeleteStatus{Delete: false} @@ -179,3 +204,38 @@ func createDrainHelper(kubeClient kubernetes.Interface, ctx context.Context, ful return drainer } + +func podHasSRIOVDevice(p *corev1.Pod) bool { + for _, c := range p.Spec.Containers { + if c.Resources.Requests != nil { + for r := range c.Resources.Requests { + if strings.HasPrefix(r.String(), vars.ResourcePrefix) { + return true + } + } + } + } + + return false +} + +func podsHasDSOwner(p *corev1.Pod) bool { + for _, o := range p.OwnerReferences { + if o.Kind == "DaemonSet" { + return true + } + } + + return false +} + +func getDsPodsToRemove(pl *corev1.PodList) []corev1.Pod { + podsToRemove := []corev1.Pod{} + for _, pod := range pl.Items { + if podsHasDSOwner(&pod) && podHasSRIOVDevice(&pod) { + podsToRemove = append(podsToRemove, pod) + } + } + + return podsToRemove +} diff --git a/test/conformance/tests/test_policy_configuration.go b/test/conformance/tests/test_policy_configuration.go index 6d5d8b259..00e691d63 100644 --- a/test/conformance/tests/test_policy_configuration.go +++ b/test/conformance/tests/test_policy_configuration.go @@ -448,6 +448,7 @@ var _ = Describe("[sriov] operator", Ordered, func() { createTestPod(nodeToTest, []string{sriovNetworkName}) }) }) + Context("PF shutdown", func() { // 29398 It("Should be able to create pods successfully if PF is down.Pods are able to communicate with each other on the same node", func() { diff --git a/test/conformance/tests/test_sriov_operator.go b/test/conformance/tests/test_sriov_operator.go index 73c53412c..439fb1e36 100644 --- a/test/conformance/tests/test_sriov_operator.go +++ b/test/conformance/tests/test_sriov_operator.go @@ -1045,7 +1045,7 @@ var _ = Describe("[sriov] operator", func() { }) Describe("Custom SriovNetworkNodePolicy", func() { - BeforeEach(func() { + AfterEach(func() { err := namespaces.Clean(operatorNamespace, namespaces.Test, clients, discovery.Enabled()) Expect(err).ToNot(HaveOccurred()) WaitForSRIOVStable() @@ -1722,6 +1722,111 @@ var _ = Describe("[sriov] operator", func() { }) }) + Context("Draining Daemons using SR-IOV", func() { + var node string + resourceName := "drainresource" + sriovNetworkName := "test-drainnetwork" + var drainPolicy *sriovv1.SriovNetworkNodePolicy + + BeforeEach(func() { + isSingleNode, err := cluster.IsSingleNode(clients) + Expect(err).ToNot(HaveOccurred()) + if isSingleNode { + // TODO: change this when we add support for draining on single node + Skip("This test is not supported on single node as we don't drain on single node") + } + + node = sriovInfos.Nodes[0] + sriovDeviceList, err := sriovInfos.FindSriovDevices(node) + Expect(err).ToNot(HaveOccurred()) + intf := sriovDeviceList[0] + By("Using device " + intf.Name + " on node " + node) + + drainPolicy = &sriovv1.SriovNetworkNodePolicy{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "test-drainpolicy", + Namespace: operatorNamespace, + }, + + Spec: sriovv1.SriovNetworkNodePolicySpec{ + NodeSelector: map[string]string{ + "kubernetes.io/hostname": node, + }, + Mtu: 1500, + NumVfs: 5, + ResourceName: resourceName, + Priority: 99, + NicSelector: sriovv1.SriovNetworkNicSelector{ + PfNames: []string{intf.Name}, + }, + DeviceType: "netdevice", + }, + } + + err = clients.Create(context.Background(), drainPolicy) + Expect(err).ToNot(HaveOccurred()) + + WaitForSRIOVStable() + By("waiting for the resources to be available") + Eventually(func() int64 { + testedNode, err := clients.CoreV1Interface.Nodes().Get(context.Background(), node, metav1.GetOptions{}) + Expect(err).ToNot(HaveOccurred()) + resNum := testedNode.Status.Allocatable[corev1.ResourceName("openshift.io/"+resourceName)] + allocatable, _ := resNum.AsInt64() + return allocatable + }, 10*time.Minute, time.Second).Should(Equal(int64(5))) + + sriovNetwork := &sriovv1.SriovNetwork{ + ObjectMeta: metav1.ObjectMeta{ + Name: sriovNetworkName, + Namespace: operatorNamespace, + }, + Spec: sriovv1.SriovNetworkSpec{ + ResourceName: resourceName, + IPAM: `{"type":"host-local","subnet":"10.10.10.0/24","rangeStart":"10.10.10.171","rangeEnd":"10.10.10.181"}`, + NetworkNamespace: namespaces.Test, + }} + + // We need this to be able to run the connectivity checks on Mellanox cards + if intf.DeviceID == "1015" { + sriovNetwork.Spec.SpoofChk = off + } + + err = clients.Create(context.Background(), sriovNetwork) + + Expect(err).ToNot(HaveOccurred()) + waitForNetAttachDef("test-drainnetwork", namespaces.Test) + + createTestDaemonSet(node, []string{sriovNetworkName}) + }) + + It("should reconcile managed VF if status is changed", func() { + By("getting running config-daemon and test pod on the node") + daemonTestPod, err := findPodOnNodeWithLabelsAndNamespace(node, namespaces.Test, map[string]string{"app": "test"}) + Expect(err).ToNot(HaveOccurred()) + daemonConfigPod, err := findPodOnNodeWithLabelsAndNamespace(node, operatorNamespace, map[string]string{"app": "sriov-network-config-daemon"}) + Expect(err).ToNot(HaveOccurred()) + + By("deleting the sriov policy to start a drain") + err = clients.Delete(context.Background(), drainPolicy) + Expect(err).ToNot(HaveOccurred()) + WaitForSRIOVStable() + + tmpDaemon := &appsv1.DaemonSet{} + By("Checking the pod owned by a DaemonSet requesting sriov device was deleted ") + Eventually(func(g Gomega) bool { + err = clients.Client.Get(context.Background(), runtimeclient.ObjectKey{Name: daemonTestPod.Name, Namespace: daemonTestPod.Namespace}, tmpDaemon) + return err != nil && k8serrors.IsNotFound(err) + }, time.Minute, 5*time.Second).Should(BeTrue()) + + By("Checking the pod owned by a DaemonSet not requesting an sriov device was not deleted") + Consistently(func(g Gomega) bool { + err = clients.Client.Get(context.Background(), runtimeclient.ObjectKey{Name: daemonConfigPod.Name, Namespace: daemonConfigPod.Namespace}, tmpDaemon) + return err != nil && k8serrors.IsNotFound(err) + }, time.Minute, 10*time.Second).Should(BeTrue()) + }) + }) + }) }) @@ -1842,6 +1947,11 @@ func findMainSriovDevice(executorPod *corev1.Pod, sriovDevices []*sriovv1.Interf func findUnusedSriovDevices(testNode string, sriovDevices []*sriovv1.InterfaceExt) ([]*sriovv1.InterfaceExt, error) { createdPod := createCustomTestPod(testNode, []string{}, true, nil) + defer func() { + err := clients.Delete(context.Background(), createdPod) + Expect(err).ToNot(HaveOccurred()) + }() + filteredDevices := []*sriovv1.InterfaceExt{} stdout, _, err := pod.ExecCommand(clients, createdPod, "ip", "route") Expect(err).ToNot(HaveOccurred()) @@ -1989,6 +2099,24 @@ func isDaemonsetScheduledOnNodes(nodeSelector, daemonsetLabelSelector string) bo return true } +func findPodOnNodeWithLabelsAndNamespace(nodeName string, namespace string, labels map[string]string) (*corev1.Pod, error) { + podList := &corev1.PodList{} + err := clients.List(context.Background(), podList, runtimeclient.MatchingLabels(labels), &runtimeclient.ListOptions{Namespace: namespace}, runtimeclient.MatchingFields{"spec.nodeName": nodeName}) + if err != nil { + return nil, err + } + + if len(podList.Items) == 0 { + return nil, fmt.Errorf("no pod found") + } + + if len(podList.Items) > 1 { + return nil, fmt.Errorf("multiple pods found") + } + + return &podList.Items[0], nil +} + func createSriovPolicy(sriovDevice string, testNode string, numVfs int, resourceName string) { _, err := network.CreateSriovPolicy(clients, "test-policy-", operatorNamespace, sriovDevice, testNode, numVfs, resourceName, "netdevice") Expect(err).ToNot(HaveOccurred()) @@ -2017,7 +2145,6 @@ func createCustomTestPod(node string, networks []string, hostNetwork bool, podCa node, ) } - if len(podCapabilities) != 0 { if podDefinition.Spec.Containers[0].SecurityContext == nil { podDefinition.Spec.Containers[0].SecurityContext = &corev1.SecurityContext{} @@ -2034,6 +2161,41 @@ func createCustomTestPod(node string, networks []string, hostNetwork bool, podCa return waitForPodRunning(createdPod) } +func createTestDaemonSet(node string, networks []string) *appsv1.DaemonSet { + podDefinition := pod.RedefineWithNodeSelector( + pod.GetDefinition(), + node, + ) + + // remove NET_ADMIN to not have issues in OCP + podDefinition.Spec.Containers[0].SecurityContext = nil + + daemonDefinition := &appsv1.DaemonSet{ + ObjectMeta: metav1.ObjectMeta{GenerateName: "test-", Namespace: namespaces.Test}, + Spec: appsv1.DaemonSetSpec{ + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app": "test", + }, + }, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "app": "test", + }, + Annotations: map[string]string{"k8s.v1.cni.cncf.io/networks": strings.Join(networks, ",")}, + }, + Spec: podDefinition.Spec, + }, + }, + } + + err := clients.Create(context.Background(), daemonDefinition) + Expect(err).ToNot(HaveOccurred()) + + return waitForDaemonSetReady(daemonDefinition) +} + func pingPod(ip string, nodeSelector string, sriovNetworkAttachment string) { ipProtocolVersion := "6" if len(strings.Split(ip, ".")) == 4 { @@ -2376,6 +2538,18 @@ func waitForPodRunning(p *corev1.Pod) *corev1.Pod { return ret } +func waitForDaemonSetReady(d *appsv1.DaemonSet) *appsv1.DaemonSet { + Eventually(func(g Gomega) bool { + err := clients.Get(context.Background(), runtimeclient.ObjectKey{Name: d.Name, Namespace: d.Namespace}, d) + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(d.Status.DesiredNumberScheduled).To(BeNumerically(">", 0)) + g.Expect(d.Status.CurrentNumberScheduled).To(BeNumerically(">", 0)) + return d.Status.DesiredNumberScheduled == d.Status.NumberReady + }, 3*time.Minute, 1*time.Second).Should(BeTrue(), "DaemonSet [%s/%s] should have running pods", d.Namespace, d.Name) + + return d +} + // assertNodeStateHasVFMatching asserts that the given node state has at least one VF matching the given fields func assertNodeStateHasVFMatching(nodeName string, fields Fields) { EventuallyWithOffset(1, func(g Gomega) sriovv1.InterfaceExts {