diff --git a/controllers/drain_controller_test.go b/controllers/drain_controller_test.go index 3ecffa81c..b1256561b 100644 --- a/controllers/drain_controller_test.go +++ b/controllers/drain_controller_test.go @@ -326,7 +326,7 @@ func expectNodeStateAnnotation(nodeState *sriovnetworkv1.SriovNetworkNodeState, g.Expect(utils.ObjectHasAnnotation(nodeState, constants.NodeStateDrainAnnotationCurrent, expectedAnnotationValue)). To(BeTrue(), "Node[%s] annotation[%s] == '%s'. Expected '%s'", nodeState.Name, constants.NodeDrainAnnotation, nodeState.GetLabels()[constants.NodeStateDrainAnnotationCurrent], expectedAnnotationValue) - }, "20s", "1s").Should(Succeed()) + }, "200s", "1s").Should(Succeed()) } func expectNumberOfDrainingNodes(numbOfDrain int, nodesState ...*sriovnetworkv1.SriovNetworkNodeState) { diff --git a/pkg/drain/drainer.go b/pkg/drain/drainer.go index 22dbed3df..2fb24a74f 100644 --- a/pkg/drain/drainer.go +++ b/pkg/drain/drainer.go @@ -7,6 +7,7 @@ import ( "time" corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" "k8s.io/kubectl/pkg/drain" @@ -68,16 +69,15 @@ func (d *Drainer) DrainNode(ctx context.Context, node *corev1.Node, fullNodeDrai return false, nil } - drainHelper := createDrainHelper(d.kubeClient, ctx, fullNodeDrain) backoff := wait.Backoff{ Steps: 5, - Duration: 10 * time.Second, + Duration: 5 * time.Second, Factor: 2, } var lastErr error - reqLogger.Info("drainNode(): Start draining") - if err = wait.ExponentialBackoff(backoff, func() (bool, error) { + if err = wait.ExponentialBackoffWithContext(ctx, backoff, func(ctx context.Context) (bool, error) { + drainHelper := createDrainHelper(d.kubeClient, ctx, fullNodeDrain) err := drain.RunCordonOrUncordon(drainHelper, node, true) if err != nil { lastErr = err @@ -85,12 +85,19 @@ func (d *Drainer) DrainNode(ctx context.Context, node *corev1.Node, fullNodeDrai return false, nil } err = drain.RunNodeDrain(drainHelper, node.Name) - if err == nil { - return true, nil + if err != nil { + lastErr = err + reqLogger.Info("drainNode(): Draining failed, retrying", "error", err) + return false, nil } - lastErr = err - reqLogger.Info("drainNode(): Draining failed, retrying", "error", err) - return false, nil + + err = d.removeDaemonSetsFromNode(ctx, node.Name) + if err != nil { + lastErr = err + return false, nil + } + + return true, nil }); err != nil { if wait.Interrupted(err) { reqLogger.Info("drainNode(): failed to drain node", "steps", backoff.Steps, "error", lastErr) @@ -131,6 +138,28 @@ func (d *Drainer) CompleteDrainNode(ctx context.Context, node *corev1.Node) (boo return completed, nil } +// removeDaemonSetsFromNode go over all the remain pods and search for DaemonSets that have SR-IOV devices to remove them +// we can't use the drain from core kubernetes as it doesn't support removing pods that are part of a DaemonSets +func (d *Drainer) removeDaemonSetsFromNode(ctx context.Context, nodeName string) error { + reqLogger := log.FromContext(ctx) + reqLogger.Info("drainNode(): remove DaemonSets using sriov devices from node", "nodeName", nodeName) + + podList, err := d.kubeClient.CoreV1().Pods("").List(ctx, metav1.ListOptions{FieldSelector: fmt.Sprintf("spec.nodeName=%s", nodeName)}) + if err != nil { + reqLogger.Info("drainNode(): Failed to list pods, retrying", "error", err) + return err + } + + // remove pods that are owned by a DaemonSet and use SR-IOV devices + dsPodsList := getDsPodsToRemove(podList) + drainHelper := createDrainHelper(d.kubeClient, ctx, true) + err = drainHelper.DeleteOrEvictPods(dsPodsList) + if err != nil { + reqLogger.Error(err, "failed to delete or evict pods from node", "nodeName", nodeName) + } + return err +} + // createDrainHelper function to create a drain helper // if fullDrain is false we only remove pods that have the resourcePrefix // if not we remove all the pods in the node @@ -150,25 +179,21 @@ func createDrainHelper(kubeClient kubernetes.Interface, ctx context.Context, ful } log.Log.Info(fmt.Sprintf("%s pod from Node %s/%s", verbStr, pod.Namespace, pod.Name)) }, - Ctx: ctx, - Out: writer{logger.Info}, - ErrOut: writer{func(msg string, kv ...interface{}) { logger.Error(nil, msg, kv...) }}, + Ctx: ctx, + Out: writer{logger.Info}, + ErrOut: writer{func(msg string, kv ...interface{}) { + logger.Error(nil, msg, kv...) + }}, } // when we just want to drain and not reboot we can only remove the pods using sriov devices if !fullDrain { deleteFunction := func(p corev1.Pod) drain.PodDeleteStatus { - for _, c := range p.Spec.Containers { - if c.Resources.Requests != nil { - for r := range c.Resources.Requests { - if strings.HasPrefix(r.String(), vars.ResourcePrefix) { - return drain.PodDeleteStatus{ - Delete: true, - Reason: "pod contain SR-IOV device", - Message: "SR-IOV network operator draining the node", - } - } - } + if podHasSRIOVDevice(&p) { + return drain.PodDeleteStatus{ + Delete: true, + Reason: "pod contains SR-IOV device", + Message: "SR-IOV network operator draining the node", } } return drain.PodDeleteStatus{Delete: false} @@ -179,3 +204,38 @@ func createDrainHelper(kubeClient kubernetes.Interface, ctx context.Context, ful return drainer } + +func podHasSRIOVDevice(p *corev1.Pod) bool { + for _, c := range p.Spec.Containers { + if c.Resources.Requests != nil { + for r := range c.Resources.Requests { + if strings.HasPrefix(r.String(), vars.ResourcePrefix) { + return true + } + } + } + } + + return false +} + +func podsHasDSOwner(p *corev1.Pod) bool { + for _, o := range p.OwnerReferences { + if o.Kind == "DaemonSet" { + return true + } + } + + return false +} + +func getDsPodsToRemove(pl *corev1.PodList) []corev1.Pod { + podsToRemove := []corev1.Pod{} + for _, pod := range pl.Items { + if podsHasDSOwner(&pod) && podHasSRIOVDevice(&pod) { + podsToRemove = append(podsToRemove, pod) + } + } + + return podsToRemove +} diff --git a/test/conformance/tests/test_policy_configuration.go b/test/conformance/tests/test_policy_configuration.go index 6d5d8b259..00e691d63 100644 --- a/test/conformance/tests/test_policy_configuration.go +++ b/test/conformance/tests/test_policy_configuration.go @@ -448,6 +448,7 @@ var _ = Describe("[sriov] operator", Ordered, func() { createTestPod(nodeToTest, []string{sriovNetworkName}) }) }) + Context("PF shutdown", func() { // 29398 It("Should be able to create pods successfully if PF is down.Pods are able to communicate with each other on the same node", func() { diff --git a/test/conformance/tests/test_sriov_operator.go b/test/conformance/tests/test_sriov_operator.go index 73c53412c..439fb1e36 100644 --- a/test/conformance/tests/test_sriov_operator.go +++ b/test/conformance/tests/test_sriov_operator.go @@ -1045,7 +1045,7 @@ var _ = Describe("[sriov] operator", func() { }) Describe("Custom SriovNetworkNodePolicy", func() { - BeforeEach(func() { + AfterEach(func() { err := namespaces.Clean(operatorNamespace, namespaces.Test, clients, discovery.Enabled()) Expect(err).ToNot(HaveOccurred()) WaitForSRIOVStable() @@ -1722,6 +1722,111 @@ var _ = Describe("[sriov] operator", func() { }) }) + Context("Draining Daemons using SR-IOV", func() { + var node string + resourceName := "drainresource" + sriovNetworkName := "test-drainnetwork" + var drainPolicy *sriovv1.SriovNetworkNodePolicy + + BeforeEach(func() { + isSingleNode, err := cluster.IsSingleNode(clients) + Expect(err).ToNot(HaveOccurred()) + if isSingleNode { + // TODO: change this when we add support for draining on single node + Skip("This test is not supported on single node as we don't drain on single node") + } + + node = sriovInfos.Nodes[0] + sriovDeviceList, err := sriovInfos.FindSriovDevices(node) + Expect(err).ToNot(HaveOccurred()) + intf := sriovDeviceList[0] + By("Using device " + intf.Name + " on node " + node) + + drainPolicy = &sriovv1.SriovNetworkNodePolicy{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "test-drainpolicy", + Namespace: operatorNamespace, + }, + + Spec: sriovv1.SriovNetworkNodePolicySpec{ + NodeSelector: map[string]string{ + "kubernetes.io/hostname": node, + }, + Mtu: 1500, + NumVfs: 5, + ResourceName: resourceName, + Priority: 99, + NicSelector: sriovv1.SriovNetworkNicSelector{ + PfNames: []string{intf.Name}, + }, + DeviceType: "netdevice", + }, + } + + err = clients.Create(context.Background(), drainPolicy) + Expect(err).ToNot(HaveOccurred()) + + WaitForSRIOVStable() + By("waiting for the resources to be available") + Eventually(func() int64 { + testedNode, err := clients.CoreV1Interface.Nodes().Get(context.Background(), node, metav1.GetOptions{}) + Expect(err).ToNot(HaveOccurred()) + resNum := testedNode.Status.Allocatable[corev1.ResourceName("openshift.io/"+resourceName)] + allocatable, _ := resNum.AsInt64() + return allocatable + }, 10*time.Minute, time.Second).Should(Equal(int64(5))) + + sriovNetwork := &sriovv1.SriovNetwork{ + ObjectMeta: metav1.ObjectMeta{ + Name: sriovNetworkName, + Namespace: operatorNamespace, + }, + Spec: sriovv1.SriovNetworkSpec{ + ResourceName: resourceName, + IPAM: `{"type":"host-local","subnet":"10.10.10.0/24","rangeStart":"10.10.10.171","rangeEnd":"10.10.10.181"}`, + NetworkNamespace: namespaces.Test, + }} + + // We need this to be able to run the connectivity checks on Mellanox cards + if intf.DeviceID == "1015" { + sriovNetwork.Spec.SpoofChk = off + } + + err = clients.Create(context.Background(), sriovNetwork) + + Expect(err).ToNot(HaveOccurred()) + waitForNetAttachDef("test-drainnetwork", namespaces.Test) + + createTestDaemonSet(node, []string{sriovNetworkName}) + }) + + It("should reconcile managed VF if status is changed", func() { + By("getting running config-daemon and test pod on the node") + daemonTestPod, err := findPodOnNodeWithLabelsAndNamespace(node, namespaces.Test, map[string]string{"app": "test"}) + Expect(err).ToNot(HaveOccurred()) + daemonConfigPod, err := findPodOnNodeWithLabelsAndNamespace(node, operatorNamespace, map[string]string{"app": "sriov-network-config-daemon"}) + Expect(err).ToNot(HaveOccurred()) + + By("deleting the sriov policy to start a drain") + err = clients.Delete(context.Background(), drainPolicy) + Expect(err).ToNot(HaveOccurred()) + WaitForSRIOVStable() + + tmpDaemon := &appsv1.DaemonSet{} + By("Checking the pod owned by a DaemonSet requesting sriov device was deleted ") + Eventually(func(g Gomega) bool { + err = clients.Client.Get(context.Background(), runtimeclient.ObjectKey{Name: daemonTestPod.Name, Namespace: daemonTestPod.Namespace}, tmpDaemon) + return err != nil && k8serrors.IsNotFound(err) + }, time.Minute, 5*time.Second).Should(BeTrue()) + + By("Checking the pod owned by a DaemonSet not requesting an sriov device was not deleted") + Consistently(func(g Gomega) bool { + err = clients.Client.Get(context.Background(), runtimeclient.ObjectKey{Name: daemonConfigPod.Name, Namespace: daemonConfigPod.Namespace}, tmpDaemon) + return err != nil && k8serrors.IsNotFound(err) + }, time.Minute, 10*time.Second).Should(BeTrue()) + }) + }) + }) }) @@ -1842,6 +1947,11 @@ func findMainSriovDevice(executorPod *corev1.Pod, sriovDevices []*sriovv1.Interf func findUnusedSriovDevices(testNode string, sriovDevices []*sriovv1.InterfaceExt) ([]*sriovv1.InterfaceExt, error) { createdPod := createCustomTestPod(testNode, []string{}, true, nil) + defer func() { + err := clients.Delete(context.Background(), createdPod) + Expect(err).ToNot(HaveOccurred()) + }() + filteredDevices := []*sriovv1.InterfaceExt{} stdout, _, err := pod.ExecCommand(clients, createdPod, "ip", "route") Expect(err).ToNot(HaveOccurred()) @@ -1989,6 +2099,24 @@ func isDaemonsetScheduledOnNodes(nodeSelector, daemonsetLabelSelector string) bo return true } +func findPodOnNodeWithLabelsAndNamespace(nodeName string, namespace string, labels map[string]string) (*corev1.Pod, error) { + podList := &corev1.PodList{} + err := clients.List(context.Background(), podList, runtimeclient.MatchingLabels(labels), &runtimeclient.ListOptions{Namespace: namespace}, runtimeclient.MatchingFields{"spec.nodeName": nodeName}) + if err != nil { + return nil, err + } + + if len(podList.Items) == 0 { + return nil, fmt.Errorf("no pod found") + } + + if len(podList.Items) > 1 { + return nil, fmt.Errorf("multiple pods found") + } + + return &podList.Items[0], nil +} + func createSriovPolicy(sriovDevice string, testNode string, numVfs int, resourceName string) { _, err := network.CreateSriovPolicy(clients, "test-policy-", operatorNamespace, sriovDevice, testNode, numVfs, resourceName, "netdevice") Expect(err).ToNot(HaveOccurred()) @@ -2017,7 +2145,6 @@ func createCustomTestPod(node string, networks []string, hostNetwork bool, podCa node, ) } - if len(podCapabilities) != 0 { if podDefinition.Spec.Containers[0].SecurityContext == nil { podDefinition.Spec.Containers[0].SecurityContext = &corev1.SecurityContext{} @@ -2034,6 +2161,41 @@ func createCustomTestPod(node string, networks []string, hostNetwork bool, podCa return waitForPodRunning(createdPod) } +func createTestDaemonSet(node string, networks []string) *appsv1.DaemonSet { + podDefinition := pod.RedefineWithNodeSelector( + pod.GetDefinition(), + node, + ) + + // remove NET_ADMIN to not have issues in OCP + podDefinition.Spec.Containers[0].SecurityContext = nil + + daemonDefinition := &appsv1.DaemonSet{ + ObjectMeta: metav1.ObjectMeta{GenerateName: "test-", Namespace: namespaces.Test}, + Spec: appsv1.DaemonSetSpec{ + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app": "test", + }, + }, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "app": "test", + }, + Annotations: map[string]string{"k8s.v1.cni.cncf.io/networks": strings.Join(networks, ",")}, + }, + Spec: podDefinition.Spec, + }, + }, + } + + err := clients.Create(context.Background(), daemonDefinition) + Expect(err).ToNot(HaveOccurred()) + + return waitForDaemonSetReady(daemonDefinition) +} + func pingPod(ip string, nodeSelector string, sriovNetworkAttachment string) { ipProtocolVersion := "6" if len(strings.Split(ip, ".")) == 4 { @@ -2376,6 +2538,18 @@ func waitForPodRunning(p *corev1.Pod) *corev1.Pod { return ret } +func waitForDaemonSetReady(d *appsv1.DaemonSet) *appsv1.DaemonSet { + Eventually(func(g Gomega) bool { + err := clients.Get(context.Background(), runtimeclient.ObjectKey{Name: d.Name, Namespace: d.Namespace}, d) + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(d.Status.DesiredNumberScheduled).To(BeNumerically(">", 0)) + g.Expect(d.Status.CurrentNumberScheduled).To(BeNumerically(">", 0)) + return d.Status.DesiredNumberScheduled == d.Status.NumberReady + }, 3*time.Minute, 1*time.Second).Should(BeTrue(), "DaemonSet [%s/%s] should have running pods", d.Namespace, d.Name) + + return d +} + // assertNodeStateHasVFMatching asserts that the given node state has at least one VF matching the given fields func assertNodeStateHasVFMatching(nodeName string, fields Fields) { EventuallyWithOffset(1, func(g Gomega) sriovv1.InterfaceExts {