diff --git a/internal/state/state_skel.go b/internal/state/state_skel.go index ff26d7ccb..3e14fd8a6 100644 --- a/internal/state/state_skel.go +++ b/internal/state/state_skel.go @@ -20,9 +20,11 @@ import ( "context" "encoding/json" "fmt" + "strings" "github.com/go-logr/logr" appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" @@ -402,7 +404,7 @@ func (s *stateSkel) getSyncState(ctx context.Context, objs []*unstructured.Unstr // Object exists, check for Kind specific readiness if found.GetKind() == "DaemonSet" { - if ready, err := s.isDaemonSetReady(found, reqLogger); err != nil || !ready { + if ready, err := s.isDaemonSetReady(ctx, found, reqLogger); err != nil || !ready { reqLogger.V(consts.LogLevelInfo).Info("Object is not ready", "Kind:", obj.GetKind(), "Name", obj.GetName()) return SyncStateNotReady, err } @@ -413,7 +415,7 @@ func (s *stateSkel) getSyncState(ctx context.Context, objs []*unstructured.Unstr } // isDaemonSetReady checks if daemonset is ready -func (s *stateSkel) isDaemonSetReady(uds *unstructured.Unstructured, reqLogger logr.Logger) (bool, error) { +func (s *stateSkel) isDaemonSetReady(ctx context.Context, uds *unstructured.Unstructured, reqLogger logr.Logger) (bool, error) { buf, err := uds.MarshalJSON() if err != nil { return false, fmt.Errorf("failed to marshall unstructured daemonset object: %w", err) @@ -433,14 +435,33 @@ func (s *stateSkel) isDaemonSetReady(uds *unstructured.Unstructured, reqLogger l "UpdatedPodsScheduled", ds.Status.UpdatedNumberScheduled, "PodsReady:", ds.Status.NumberReady, "Conditions:", ds.Status.Conditions) - // Note(adrianc): We check for DesiredNumberScheduled!=0 as we expect to have at least one node that would need - // to have DaemonSet Pods deployed onto it. DesiredNumberScheduled == 0 then indicates that this field was not yet - // updated by the DaemonSet controller - // TODO: Check if we can use another field maybe to indicate it was processed by the DaemonSet controller. - if ds.Status.DesiredNumberScheduled != 0 && ds.Status.DesiredNumberScheduled == ds.Status.NumberAvailable && - ds.Status.UpdatedNumberScheduled == ds.Status.NumberAvailable { + + if ds.Status.DesiredNumberScheduled == 0 { + reqLogger.V(consts.LogLevelDebug).Info("DesiredNumberScheduled is 0, DaemonSet not ready") + return false, nil + } + + // Check basic availability + if ds.Status.NumberUnavailable != 0 { + reqLogger.V(consts.LogLevelInfo).Info("DaemonSet has unavailable pods") + return false, nil + } + + if ds.Status.DesiredNumberScheduled != ds.Status.NumberAvailable { + reqLogger.V(consts.LogLevelInfo).Info("Not all desired pods are available") + return false, nil + } + + // For OnDelete strategy, use revision hash checking with node placement verification + if ds.Spec.UpdateStrategy.Type == appsv1.OnDeleteDaemonSetStrategyType { + return s.isDaemonSetReadyOnDelete(ctx, ds, reqLogger) + } + + // For RollingUpdate strategy, check UpdatedNumberScheduled + if ds.Status.UpdatedNumberScheduled == ds.Status.NumberAvailable { return true, nil } + return false, nil } @@ -454,3 +475,125 @@ func (s *stateSkel) checkAttributesExist(attrs nodeinfo.NodeAttributes, attrType } return nil } + +// isDaemonSetReadyOnDelete checks if a DaemonSet with OnDelete update strategy is ready +// by verifying pod revision hashes and node placement +func (s *stateSkel) isDaemonSetReadyOnDelete(ctx context.Context, ds *appsv1.DaemonSet, reqLogger logr.Logger) (bool, error) { + ownedPods, err := s.getOwnedPods(ctx, ds) + if err != nil { + reqLogger.Error(err, "Failed to list pods for DaemonSet") + return false, err + } + if len(ownedPods) == 0 { + return false, nil + } + + if !s.arePodsHealthy(ownedPods) { + return false, nil + } + + dsRevisionHash, err := s.getLatestRevisionHash(ctx, ds) + if err != nil { + // If revision hash unavailable, fall back to health-only check + reqLogger.V(consts.LogLevelWarning).Info("Could not get revision hash, using health-only check", "error", err) + return true, nil + } + + // Check if all pods have the latest revision + for _, pod := range ownedPods { + if hash, ok := pod.Labels["controller-revision-hash"]; !ok || hash != dsRevisionHash { + // Pods have outdated revision - verify they're on nodes matching current nodeSelector + reqLogger.V(consts.LogLevelInfo).Info("Pods have outdated revision, verifying node placement") + return s.verifyNodePlacement(ctx, ds, ownedPods, reqLogger) + } + } + + return true, nil +} + +// getOwnedPods returns pods owned by the DaemonSet +func (s *stateSkel) getOwnedPods(ctx context.Context, ds *appsv1.DaemonSet) ([]corev1.Pod, error) { + podList := &corev1.PodList{} + opts := []client.ListOption{ + client.MatchingLabels(ds.Spec.Template.Labels), + client.InNamespace(ds.Namespace), + } + if err := s.client.List(ctx, podList, opts...); err != nil { + return nil, err + } + + var ownedPods []corev1.Pod + for _, pod := range podList.Items { + if len(pod.OwnerReferences) > 0 && pod.OwnerReferences[0].UID == ds.UID { + ownedPods = append(ownedPods, pod) + } + } + return ownedPods, nil +} + +// getLatestRevisionHash retrieves the latest ControllerRevision hash for a DaemonSet +func (s *stateSkel) getLatestRevisionHash(ctx context.Context, ds *appsv1.DaemonSet) (string, error) { + revisionList := &appsv1.ControllerRevisionList{} + if err := s.client.List(ctx, revisionList, + client.MatchingLabels(ds.Spec.Selector.MatchLabels), + client.InNamespace(ds.Namespace)); err != nil { + return "", err + } + + // Find revision with highest revision number in single pass + var latestRevision *appsv1.ControllerRevision + namePrefix := ds.Name + "-" + + for i := range revisionList.Items { + rev := &revisionList.Items[i] + if !strings.HasPrefix(rev.Name, namePrefix) { + continue + } + if latestRevision == nil || rev.Revision > latestRevision.Revision { + latestRevision = rev + } + } + + if latestRevision == nil { + return "", fmt.Errorf("no revisions found") + } + + return strings.TrimPrefix(latestRevision.Name, namePrefix), nil +} + +// arePodsHealthy checks if all pods are running with all containers ready +func (s *stateSkel) arePodsHealthy(pods []corev1.Pod) bool { + for _, pod := range pods { + if pod.Status.Phase != corev1.PodRunning { + return false + } + for _, cs := range pod.Status.ContainerStatuses { + if !cs.Ready { + return false + } + } + } + return true +} + +// verifyNodePlacement checks if all pods are on nodes matching the DaemonSet's nodeSelector +func (s *stateSkel) verifyNodePlacement(ctx context.Context, ds *appsv1.DaemonSet, pods []corev1.Pod, reqLogger logr.Logger) (bool, error) { + for _, pod := range pods { + node := &corev1.Node{} + if err := s.client.Get(ctx, types.NamespacedName{Name: pod.Spec.NodeName}, node); err != nil { + return false, err + } + + // Check if node matches nodeSelector + for key, value := range ds.Spec.Template.Spec.NodeSelector { + if node.Labels[key] != value { + reqLogger.V(consts.LogLevelInfo).Info("Pod on non-matching node", + "pod", pod.Name, "node", pod.Spec.NodeName, "selector", key, "expected", value, "actual", node.Labels[key]) + return false, nil + } + } + } + + reqLogger.V(consts.LogLevelInfo).Info("Pods healthy on correct nodes despite outdated revision") + return true, nil +}