From 8683b0ebd547b99503aa4fa2f10e1e3c7e04a689 Mon Sep 17 00:00:00 2001 From: Daniel Bizau Date: Thu, 31 Jul 2025 15:18:48 +0200 Subject: [PATCH] improve monitor loop --- pkg/monitor/cluster/cache.go | 12 + pkg/monitor/cluster/cluster.go | 47 ++- pkg/monitor/cluster/daemonsetstatuses.go | 52 --- pkg/monitor/cluster/daemonsetstatuses_test.go | 77 ---- pkg/monitor/cluster/deploymentstatuses.go | 50 --- .../cluster/deploymentstatuses_test.go | 77 ---- pkg/monitor/cluster/podconditions.go | 169 --------- pkg/monitor/cluster/replicasetstatuses.go | 52 --- .../cluster/replicasetstatuses_test.go | 77 ---- pkg/monitor/cluster/statefulsetstatuses.go | 52 --- .../cluster/statefulsetstatuses_test.go | 77 ---- pkg/monitor/cluster/workloadstatuses.go | 345 ++++++++++++++++++ ...tions_test.go => workloadstatuses_test.go} | 198 +++++++++- 13 files changed, 588 insertions(+), 697 deletions(-) delete mode 100644 pkg/monitor/cluster/daemonsetstatuses.go delete mode 100644 pkg/monitor/cluster/daemonsetstatuses_test.go delete mode 100644 pkg/monitor/cluster/deploymentstatuses.go delete mode 100644 pkg/monitor/cluster/deploymentstatuses_test.go delete mode 100644 pkg/monitor/cluster/podconditions.go delete mode 100644 pkg/monitor/cluster/replicasetstatuses.go delete mode 100644 pkg/monitor/cluster/replicasetstatuses_test.go delete mode 100644 pkg/monitor/cluster/statefulsetstatuses.go delete mode 100644 pkg/monitor/cluster/statefulsetstatuses_test.go create mode 100644 pkg/monitor/cluster/workloadstatuses.go rename pkg/monitor/cluster/{podconditions_test.go => workloadstatuses_test.go} (53%) diff --git a/pkg/monitor/cluster/cache.go b/pkg/monitor/cluster/cache.go index df70acf8f15..dd552f25943 100644 --- a/pkg/monitor/cluster/cache.go +++ b/pkg/monitor/cluster/cache.go @@ -19,6 +19,9 @@ import ( // memory usage. Don't add caches here: work to remove them. func (mon *Monitor) getClusterVersion(ctx context.Context) (*configv1.ClusterVersion, error) { + mon.cache.mu.cv.Lock() + defer mon.cache.mu.cv.Unlock() + if mon.cache.cv != nil { return mon.cache.cv, nil } @@ -30,6 +33,9 @@ func (mon *Monitor) getClusterVersion(ctx context.Context) (*configv1.ClusterVer // TODO: remove this function and paginate func (mon *Monitor) listClusterOperators(ctx context.Context) (*configv1.ClusterOperatorList, error) { + mon.cache.mu.cos.Lock() + defer mon.cache.mu.cos.Unlock() + if mon.cache.cos != nil { return mon.cache.cos, nil } @@ -41,6 +47,9 @@ func (mon *Monitor) listClusterOperators(ctx context.Context) (*configv1.Cluster // TODO: remove this function and paginate func (mon *Monitor) listNodes(ctx context.Context) (*corev1.NodeList, error) { + mon.cache.mu.ns.Lock() + defer mon.cache.mu.ns.Unlock() + if mon.cache.ns != nil { return mon.cache.ns, nil } @@ -52,6 +61,9 @@ func (mon *Monitor) listNodes(ctx context.Context) (*corev1.NodeList, error) { // TODO: remove this function and paginate func (mon *Monitor) listARODeployments(ctx context.Context) (*appsv1.DeploymentList, error) { + mon.cache.mu.arodl.Lock() + defer mon.cache.mu.arodl.Unlock() + if mon.cache.arodl != nil { return mon.cache.arodl, nil } diff --git a/pkg/monitor/cluster/cluster.go b/pkg/monitor/cluster/cluster.go index bc2c4533778..5bdf2bc8983 100644 --- a/pkg/monitor/cluster/cluster.go +++ b/pkg/monitor/cluster/cluster.go @@ -24,6 +24,7 @@ import ( configclient "github.com/openshift/client-go/config/clientset/versioned" machineclient "github.com/openshift/client-go/machine/clientset/versioned" operatorclient "github.com/openshift/client-go/operator/clientset/versioned" + machinev1 "github.com/openshift/machine-config-operator/pkg/apis/machineconfiguration.openshift.io/v1" mcoclient "github.com/openshift/machine-config-operator/pkg/generated/clientset/versioned" "github.com/Azure/ARO-RP/pkg/api" @@ -68,6 +69,15 @@ type Monitor struct { cv *configv1.ClusterVersion ns *corev1.NodeList arodl *appsv1.DeploymentList + mcps *machinev1.MachineConfigPoolList + mu struct { + cos sync.Mutex + cs sync.Mutex + cv sync.Mutex + ns sync.Mutex + arodl sync.Mutex + mcps sync.Mutex + } } wg *sync.WaitGroup @@ -208,7 +218,8 @@ func (mon *Monitor) Monitor(ctx context.Context) (errs []error) { } return } - for _, f := range []func(context.Context) error{ + + checks := []func(context.Context) error{ mon.emitAroOperatorHeartbeat, mon.emitAroOperatorConditions, mon.emitNSGReconciliation, @@ -216,16 +227,12 @@ func (mon *Monitor) Monitor(ctx context.Context) (errs []error) { mon.emitClusterOperatorVersions, mon.emitClusterVersionConditions, mon.emitClusterVersions, - mon.emitDaemonsetStatuses, - mon.emitDeploymentStatuses, + mon.emitWorkloadStatuses, mon.emitMachineConfigPoolConditions, mon.emitMachineConfigPoolUnmanagedNodeCounts, mon.emitNodeConditions, - mon.emitPodConditions, mon.emitDebugPodsCount, mon.detectQuotaFailure, - mon.emitReplicasetStatuses, - mon.emitStatefulsetStatuses, mon.emitJobConditions, mon.emitSummary, mon.emitHiveRegistrationStatus, @@ -238,13 +245,27 @@ func (mon *Monitor) Monitor(ctx context.Context) (errs []error) { mon.emitPrometheusAlerts, // at the end for now because it's the slowest/least reliable mon.emitCWPStatus, mon.emitClusterAuthenticationType, - } { - err = f(ctx) - if err != nil { - errs = append(errs, err) - mon.emitFailureToGatherMetric(steps.FriendlyName(f), err) - // keep going - } + } + + var wg sync.WaitGroup + errChan := make(chan error, len(checks)) + wg.Add(len(checks)) + + for _, f := range checks { + go func(f func(context.Context) error) { + defer wg.Done() + if err := f(ctx); err != nil { + mon.emitFailureToGatherMetric(steps.FriendlyName(f), err) + errChan <- err + } + }(f) + } + + wg.Wait() + close(errChan) + + for err := range errChan { + errs = append(errs, err) } return diff --git a/pkg/monitor/cluster/daemonsetstatuses.go b/pkg/monitor/cluster/daemonsetstatuses.go deleted file mode 100644 index 9d1db8fdb74..00000000000 --- a/pkg/monitor/cluster/daemonsetstatuses.go +++ /dev/null @@ -1,52 +0,0 @@ -package cluster - -// Copyright (c) Microsoft Corporation. -// Licensed under the Apache License 2.0. - -import ( - "context" - "strconv" - - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - - "github.com/Azure/ARO-RP/pkg/util/namespace" -) - -func (mon *Monitor) emitDaemonsetStatuses(ctx context.Context) error { - var cont string - var count int64 - for { - dss, err := mon.cli.AppsV1().DaemonSets("").List(ctx, metav1.ListOptions{Limit: 500, Continue: cont}) - if err != nil { - return err - } - - count += int64(len(dss.Items)) - - for _, ds := range dss.Items { - if !namespace.IsOpenShiftNamespace(ds.Namespace) { - continue - } - - if ds.Status.DesiredNumberScheduled == ds.Status.NumberAvailable { - continue - } - - mon.emitGauge("daemonset.statuses", 1, map[string]string{ - "desiredNumberScheduled": strconv.Itoa(int(ds.Status.DesiredNumberScheduled)), - "name": ds.Name, - "namespace": ds.Namespace, - "numberAvailable": strconv.Itoa(int(ds.Status.NumberAvailable)), - }) - } - - cont = dss.Continue - if cont == "" { - break - } - } - - mon.emitGauge("daemonset.count", count, nil) - - return nil -} diff --git a/pkg/monitor/cluster/daemonsetstatuses_test.go b/pkg/monitor/cluster/daemonsetstatuses_test.go deleted file mode 100644 index 464d67c374d..00000000000 --- a/pkg/monitor/cluster/daemonsetstatuses_test.go +++ /dev/null @@ -1,77 +0,0 @@ -package cluster - -// Copyright (c) Microsoft Corporation. -// Licensed under the Apache License 2.0. - -import ( - "context" - "strconv" - "testing" - - "go.uber.org/mock/gomock" - - appsv1 "k8s.io/api/apps/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/kubernetes/fake" - - mock_metrics "github.com/Azure/ARO-RP/pkg/util/mocks/metrics" -) - -func TestEmitDaemonsetStatuses(t *testing.T) { - ctx := context.Background() - - cli := fake.NewSimpleClientset( - &appsv1.DaemonSet{ // metrics expected - ObjectMeta: metav1.ObjectMeta{ - Name: "name1", - Namespace: "openshift", - }, - Status: appsv1.DaemonSetStatus{ - DesiredNumberScheduled: 2, - NumberAvailable: 1, - }, - }, &appsv1.DaemonSet{ // no metric expected - ObjectMeta: metav1.ObjectMeta{ - Name: "name2", - Namespace: "openshift", - }, - Status: appsv1.DaemonSetStatus{ - DesiredNumberScheduled: 2, - NumberAvailable: 2, - }, - }, &appsv1.DaemonSet{ - ObjectMeta: metav1.ObjectMeta{ // no metric expected -customer - Name: "name2", - Namespace: "customer", - }, - Status: appsv1.DaemonSetStatus{ - DesiredNumberScheduled: 2, - NumberAvailable: 1, - }, - }, - ) - - controller := gomock.NewController(t) - defer controller.Finish() - - m := mock_metrics.NewMockEmitter(controller) - - mon := &Monitor{ - cli: cli, - m: m, - } - - m.EXPECT().EmitGauge("daemonset.statuses", int64(1), map[string]string{ - "desiredNumberScheduled": strconv.Itoa(2), - "name": "name1", - "namespace": "openshift", - "numberAvailable": strconv.Itoa(1), - }) - - m.EXPECT().EmitGauge("daemonset.count", int64(3), map[string]string{}) - - err := mon.emitDaemonsetStatuses(ctx) - if err != nil { - t.Fatal(err) - } -} diff --git a/pkg/monitor/cluster/deploymentstatuses.go b/pkg/monitor/cluster/deploymentstatuses.go deleted file mode 100644 index 771fe89890f..00000000000 --- a/pkg/monitor/cluster/deploymentstatuses.go +++ /dev/null @@ -1,50 +0,0 @@ -package cluster - -// Copyright (c) Microsoft Corporation. -// Licensed under the Apache License 2.0. - -import ( - "context" - "strconv" - - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - - "github.com/Azure/ARO-RP/pkg/util/namespace" -) - -func (mon *Monitor) emitDeploymentStatuses(ctx context.Context) error { - var cont string - var count int64 - for { - ds, err := mon.cli.AppsV1().Deployments("").List(ctx, metav1.ListOptions{Limit: 500, Continue: cont}) - if err != nil { - return err - } - - count += int64(len(ds.Items)) - - for _, d := range ds.Items { - if !namespace.IsOpenShiftNamespace(d.Namespace) { - continue - } - - if d.Status.Replicas == d.Status.AvailableReplicas { - continue - } - - mon.emitGauge("deployment.statuses", 1, map[string]string{ - "availableReplicas": strconv.Itoa(int(d.Status.AvailableReplicas)), - "name": d.Name, - "namespace": d.Namespace, - "replicas": strconv.Itoa(int(d.Status.Replicas)), - }) - } - - cont = ds.Continue - if cont == "" { - break - } - } - mon.emitGauge("deployment.count", count, nil) - return nil -} diff --git a/pkg/monitor/cluster/deploymentstatuses_test.go b/pkg/monitor/cluster/deploymentstatuses_test.go deleted file mode 100644 index 5bd737f7e21..00000000000 --- a/pkg/monitor/cluster/deploymentstatuses_test.go +++ /dev/null @@ -1,77 +0,0 @@ -package cluster - -// Copyright (c) Microsoft Corporation. -// Licensed under the Apache License 2.0. - -import ( - "context" - "strconv" - "testing" - - "go.uber.org/mock/gomock" - - appsv1 "k8s.io/api/apps/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/kubernetes/fake" - - mock_metrics "github.com/Azure/ARO-RP/pkg/util/mocks/metrics" -) - -func TestEmitDeploymentStatuses(t *testing.T) { - ctx := context.Background() - - cli := fake.NewSimpleClientset( - &appsv1.Deployment{ // metrics expected - ObjectMeta: metav1.ObjectMeta{ - Name: "name1", - Namespace: "openshift", - }, - Status: appsv1.DeploymentStatus{ - Replicas: 2, - AvailableReplicas: 1, - }, - }, &appsv1.Deployment{ // no metric expected - ObjectMeta: metav1.ObjectMeta{ - Name: "name2", - Namespace: "openshift", - }, - Status: appsv1.DeploymentStatus{ - Replicas: 2, - AvailableReplicas: 2, - }, - }, &appsv1.Deployment{ - ObjectMeta: metav1.ObjectMeta{ // no metric expected -customer - Name: "name2", - Namespace: "customer", - }, - Status: appsv1.DeploymentStatus{ - Replicas: 2, - AvailableReplicas: 1, - }, - }, - ) - - controller := gomock.NewController(t) - defer controller.Finish() - - m := mock_metrics.NewMockEmitter(controller) - - mon := &Monitor{ - cli: cli, - m: m, - } - - m.EXPECT().EmitGauge("deployment.count", int64(3), map[string]string{}) - - m.EXPECT().EmitGauge("deployment.statuses", int64(1), map[string]string{ - "availableReplicas": strconv.Itoa(1), - "name": "name1", - "namespace": "openshift", - "replicas": strconv.Itoa(2), - }) - - err := mon.emitDeploymentStatuses(ctx) - if err != nil { - t.Fatal(err) - } -} diff --git a/pkg/monitor/cluster/podconditions.go b/pkg/monitor/cluster/podconditions.go deleted file mode 100644 index f5acd6745f9..00000000000 --- a/pkg/monitor/cluster/podconditions.go +++ /dev/null @@ -1,169 +0,0 @@ -package cluster - -// Copyright (c) Microsoft Corporation. -// Licensed under the Apache License 2.0. - -import ( - "context" - - "github.com/sirupsen/logrus" - - corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/kubernetes/pkg/kubelet/events" - - "github.com/Azure/ARO-RP/pkg/util/namespace" -) - -var podConditionsExpected = map[corev1.PodConditionType]corev1.ConditionStatus{ - corev1.ContainersReady: corev1.ConditionTrue, - corev1.PodInitialized: corev1.ConditionTrue, - corev1.PodScheduled: corev1.ConditionTrue, - corev1.PodReady: corev1.ConditionTrue, -} - -var restartCounterThreshold int32 = 10 - -func (mon *Monitor) emitPodConditions(ctx context.Context) error { - // to list pods once - var cont string - var count int64 - for { - ps, err := mon.cli.CoreV1().Pods("").List(ctx, metav1.ListOptions{Limit: 500, Continue: cont}) - if err != nil { - return err - } - - count += int64(len(ps.Items)) - - mon._emitPodConditions(ps) - mon._emitPodContainerStatuses(ps) - mon._emitPodContainerRestartCounter(ps) - - cont = ps.Continue - if cont == "" { - break - } - } - - mon.emitGauge("pod.count", count, nil) - - return nil -} - -func (mon *Monitor) _emitPodConditions(ps *corev1.PodList) { - for _, p := range ps.Items { - if !namespace.IsOpenShiftNamespace(p.Namespace) { - continue - } - - if p.Status.Phase == corev1.PodSucceeded { - continue - } - - if p.Status.Reason == events.PreemptContainer { - continue - } - - for _, c := range p.Status.Conditions { - if c.Status == podConditionsExpected[c.Type] { - continue - } - - mon.emitGauge("pod.conditions", 1, map[string]string{ - "name": p.Name, - "namespace": p.Namespace, - "nodeName": p.Spec.NodeName, - "status": string(c.Status), - "type": string(c.Type), - }) - - if mon.hourlyRun { - mon.log.WithFields(logrus.Fields{ - "metric": "pod.conditions", - "name": p.Name, - "namespace": p.Namespace, - "nodeName": p.Spec.NodeName, - "status": c.Status, - "type": c.Type, - "message": c.Message, - }).Print() - } - } - } -} - -func (mon *Monitor) _emitPodContainerStatuses(ps *corev1.PodList) { - for _, p := range ps.Items { - if !namespace.IsOpenShiftNamespace(p.Namespace) { - continue - } - - if p.Status.Phase == corev1.PodSucceeded { - continue - } - - for _, cs := range p.Status.ContainerStatuses { - if cs.State.Waiting == nil { - continue - } - - containerStatus := map[string]string{ - "name": p.Name, - "namespace": p.Namespace, - "nodeName": p.Spec.NodeName, - "containername": cs.Name, - "reason": cs.State.Waiting.Reason, - "lastTerminationState": "", - } - - if cs.LastTerminationState.Terminated != nil { - containerStatus["lastTerminationState"] = cs.LastTerminationState.Terminated.Reason - } - - mon.emitGauge("pod.containerstatuses", 1, containerStatus) - - if mon.hourlyRun { - logFields := logrus.Fields{ - "metric": "pod.containerstatuses", - "message": cs.State.Waiting.Message, - } - for label, labelVal := range containerStatus { - logFields[label] = labelVal - } - mon.log.WithFields(logFields).Print() - } - } - } -} - -func (mon *Monitor) _emitPodContainerRestartCounter(ps *corev1.PodList) { - for _, p := range ps.Items { - if !namespace.IsOpenShiftNamespace(p.Namespace) { - continue - } - - //Sum up the total number of restarts in the pod to match the number of restarts shown in the 'oc get pods' display - t := int32(0) - for _, cs := range p.Status.ContainerStatuses { - t += cs.RestartCount - } - - if t < restartCounterThreshold { - continue - } - - mon.emitGauge("pod.restartcounter", int64(t), map[string]string{ - "name": p.Name, - "namespace": p.Namespace, - }) - - if mon.hourlyRun { - mon.log.WithFields(logrus.Fields{ - "metric": "pod.restartcounter", - "name": p.Name, - "namespace": p.Namespace, - }).Print() - } - } -} diff --git a/pkg/monitor/cluster/replicasetstatuses.go b/pkg/monitor/cluster/replicasetstatuses.go deleted file mode 100644 index 1b9d30bbd8a..00000000000 --- a/pkg/monitor/cluster/replicasetstatuses.go +++ /dev/null @@ -1,52 +0,0 @@ -package cluster - -// Copyright (c) Microsoft Corporation. -// Licensed under the Apache License 2.0. - -import ( - "context" - "strconv" - - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - - "github.com/Azure/ARO-RP/pkg/util/namespace" -) - -func (mon *Monitor) emitReplicasetStatuses(ctx context.Context) error { - var cont string - var count int64 - for { - rss, err := mon.cli.AppsV1().ReplicaSets("").List(ctx, metav1.ListOptions{Limit: 500, Continue: cont}) - if err != nil { - return err - } - - count += int64(len(rss.Items)) - - for _, rs := range rss.Items { - if !namespace.IsOpenShiftNamespace(rs.Namespace) { - continue - } - - if rs.Status.Replicas == rs.Status.AvailableReplicas { - continue - } - - mon.emitGauge("replicaset.statuses", 1, map[string]string{ - "availableReplicas": strconv.Itoa(int(rs.Status.AvailableReplicas)), - "name": rs.Name, - "namespace": rs.Namespace, - "replicas": strconv.Itoa(int(rs.Status.Replicas)), - }) - } - - cont = rss.Continue - if cont == "" { - break - } - } - - mon.emitGauge("replicaset.count", count, nil) - - return nil -} diff --git a/pkg/monitor/cluster/replicasetstatuses_test.go b/pkg/monitor/cluster/replicasetstatuses_test.go deleted file mode 100644 index d5ff0537d42..00000000000 --- a/pkg/monitor/cluster/replicasetstatuses_test.go +++ /dev/null @@ -1,77 +0,0 @@ -package cluster - -// Copyright (c) Microsoft Corporation. -// Licensed under the Apache License 2.0. - -import ( - "context" - "strconv" - "testing" - - "go.uber.org/mock/gomock" - - appsv1 "k8s.io/api/apps/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/kubernetes/fake" - - mock_metrics "github.com/Azure/ARO-RP/pkg/util/mocks/metrics" -) - -func TestEmitReplicasetStatuses(t *testing.T) { - ctx := context.Background() - - cli := fake.NewSimpleClientset( - &appsv1.ReplicaSet{ // metrics expected - ObjectMeta: metav1.ObjectMeta{ - Name: "name1", - Namespace: "openshift", - }, - Status: appsv1.ReplicaSetStatus{ - Replicas: 2, - AvailableReplicas: 1, - }, - }, &appsv1.ReplicaSet{ // no metric expected - ObjectMeta: metav1.ObjectMeta{ - Name: "name2", - Namespace: "openshift", - }, - Status: appsv1.ReplicaSetStatus{ - Replicas: 2, - AvailableReplicas: 2, - }, - }, &appsv1.ReplicaSet{ - ObjectMeta: metav1.ObjectMeta{ // no metric expected -customer - Name: "name2", - Namespace: "customer", - }, - Status: appsv1.ReplicaSetStatus{ - Replicas: 2, - AvailableReplicas: 1, - }, - }, - ) - - controller := gomock.NewController(t) - defer controller.Finish() - - m := mock_metrics.NewMockEmitter(controller) - - mon := &Monitor{ - cli: cli, - m: m, - } - - m.EXPECT().EmitGauge("replicaset.count", int64(3), map[string]string{}) - - m.EXPECT().EmitGauge("replicaset.statuses", int64(1), map[string]string{ - "availableReplicas": strconv.Itoa(1), - "name": "name1", - "namespace": "openshift", - "replicas": strconv.Itoa(2), - }) - - err := mon.emitReplicasetStatuses(ctx) - if err != nil { - t.Fatal(err) - } -} diff --git a/pkg/monitor/cluster/statefulsetstatuses.go b/pkg/monitor/cluster/statefulsetstatuses.go deleted file mode 100644 index 57a95db9ce5..00000000000 --- a/pkg/monitor/cluster/statefulsetstatuses.go +++ /dev/null @@ -1,52 +0,0 @@ -package cluster - -// Copyright (c) Microsoft Corporation. -// Licensed under the Apache License 2.0. - -import ( - "context" - "strconv" - - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - - "github.com/Azure/ARO-RP/pkg/util/namespace" -) - -func (mon *Monitor) emitStatefulsetStatuses(ctx context.Context) error { - var cont string - var count int64 - for { - sss, err := mon.cli.AppsV1().StatefulSets("").List(ctx, metav1.ListOptions{Limit: 500, Continue: cont}) - if err != nil { - return err - } - - count += int64(len(sss.Items)) - - for _, ss := range sss.Items { - if !namespace.IsOpenShiftNamespace(ss.Namespace) { - continue - } - - if ss.Status.Replicas == ss.Status.ReadyReplicas { - continue - } - - mon.emitGauge("statefulset.statuses", 1, map[string]string{ - "name": ss.Name, - "namespace": ss.Namespace, - "replicas": strconv.Itoa(int(ss.Status.Replicas)), - "readyReplicas": strconv.Itoa(int(ss.Status.ReadyReplicas)), - }) - } - - cont = sss.Continue - if cont == "" { - break - } - } - - mon.emitGauge("statefulset.count", count, nil) - - return nil -} diff --git a/pkg/monitor/cluster/statefulsetstatuses_test.go b/pkg/monitor/cluster/statefulsetstatuses_test.go deleted file mode 100644 index 015b352fbdd..00000000000 --- a/pkg/monitor/cluster/statefulsetstatuses_test.go +++ /dev/null @@ -1,77 +0,0 @@ -package cluster - -// Copyright (c) Microsoft Corporation. -// Licensed under the Apache License 2.0. - -import ( - "context" - "strconv" - "testing" - - "go.uber.org/mock/gomock" - - appsv1 "k8s.io/api/apps/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/kubernetes/fake" - - mock_metrics "github.com/Azure/ARO-RP/pkg/util/mocks/metrics" -) - -func TestEmitStatefulsetStatuses(t *testing.T) { - ctx := context.Background() - - cli := fake.NewSimpleClientset( - &appsv1.StatefulSet{ // metrics expected - ObjectMeta: metav1.ObjectMeta{ - Name: "name1", - Namespace: "openshift", - }, - Status: appsv1.StatefulSetStatus{ - Replicas: 2, - ReadyReplicas: 1, - }, - }, &appsv1.StatefulSet{ // no metric expected - ObjectMeta: metav1.ObjectMeta{ - Name: "name2", - Namespace: "openshift", - }, - Status: appsv1.StatefulSetStatus{ - Replicas: 2, - ReadyReplicas: 2, - }, - }, &appsv1.StatefulSet{ - ObjectMeta: metav1.ObjectMeta{ // no metric expected -customer - Name: "name2", - Namespace: "customer", - }, - Status: appsv1.StatefulSetStatus{ - Replicas: 2, - ReadyReplicas: 1, - }, - }, - ) - - controller := gomock.NewController(t) - defer controller.Finish() - - m := mock_metrics.NewMockEmitter(controller) - - mon := &Monitor{ - cli: cli, - m: m, - } - - m.EXPECT().EmitGauge("statefulset.count", int64(3), map[string]string{}) - - m.EXPECT().EmitGauge("statefulset.statuses", int64(1), map[string]string{ - "name": "name1", - "namespace": "openshift", - "replicas": strconv.Itoa(2), - "readyReplicas": strconv.Itoa(1), - }) - - err := mon.emitStatefulsetStatuses(ctx) - if err != nil { - t.Fatal(err) - } -} diff --git a/pkg/monitor/cluster/workloadstatuses.go b/pkg/monitor/cluster/workloadstatuses.go new file mode 100644 index 00000000000..6461c6bb0d8 --- /dev/null +++ b/pkg/monitor/cluster/workloadstatuses.go @@ -0,0 +1,345 @@ +package cluster + +// Copyright (c) Microsoft Corporation. +// Licensed under the Apache License 2.0. + +import ( + "context" + "strconv" + + "github.com/sirupsen/logrus" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/kubernetes/pkg/kubelet/events" + "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/Azure/ARO-RP/pkg/util/namespace" +) + +var podConditionsExpected = map[corev1.PodConditionType]corev1.ConditionStatus{ + corev1.ContainersReady: corev1.ConditionTrue, + corev1.PodInitialized: corev1.ConditionTrue, + corev1.PodScheduled: corev1.ConditionTrue, + corev1.PodReady: corev1.ConditionTrue, +} + +var restartCounterThreshold int32 = 10 + +func (mon *Monitor) emitWorkloadStatuses(ctx context.Context) error { + // Phase 1: Get all pod metadata cluster-wide. This is a lightweight call. + var totalPodCount, totalDaemonSetCount, totalDeploymentCount, totalReplicaSetCount, totalStatefulSetCount int64 + openshiftNamespacesWithPods := map[string]struct{}{} + + for _, gvk := range []schema.GroupVersionKind{ + {Group: "", Version: "v1", Kind: "PodList"}, + {Group: "apps", Version: "v1", Kind: "DaemonSetList"}, + {Group: "apps", Version: "v1", Kind: "DeploymentList"}, + {Group: "apps", Version: "v1", Kind: "ReplicaSetList"}, + {Group: "apps", Version: "v1", Kind: "StatefulSetList"}, + } { + listOpts := &client.ListOptions{ + Limit: 500, + } + for { + metaList := &metav1.PartialObjectMetadataList{} + metaList.SetGroupVersionKind(gvk) + + err := mon.ocpclientset.List(ctx, metaList, listOpts) + if err != nil { + return err + } + + switch gvk.Kind { + case "PodList": + totalPodCount += int64(len(metaList.Items)) + case "DaemonSetList": + totalDaemonSetCount += int64(len(metaList.Items)) + case "DeploymentList": + totalDeploymentCount += int64(len(metaList.Items)) + case "ReplicaSetList": + totalReplicaSetCount += int64(len(metaList.Items)) + case "StatefulSetList": + totalStatefulSetCount += int64(len(metaList.Items)) + } + + for _, p := range metaList.Items { + if namespace.IsOpenShiftNamespace(p.Namespace) { + openshiftNamespacesWithPods[p.Namespace] = struct{}{} + } + } + + if metaList.Continue == "" { + break + } + listOpts.Continue = metaList.Continue + } + } + + mon.emitGauge("pod.count", totalPodCount, nil) + mon.emitGauge("daemonset.count", totalDaemonSetCount, nil) + mon.emitGauge("deployment.count", totalDeploymentCount, nil) + mon.emitGauge("replicaset.count", totalReplicaSetCount, nil) + mon.emitGauge("statefulset.count", totalStatefulSetCount, nil) + + // Phase 2: Get full pod objects only for the identified OpenShift namespaces. + for ns := range openshiftNamespacesWithPods { + if ctx.Err() != nil { + return ctx.Err() + } + + err := mon.emitWorkloadStatusesForNamespace(ctx, ns) + if err != nil { + return err + } + } + + return nil +} + +func (mon *Monitor) emitWorkloadStatusesForNamespace(ctx context.Context, ns string) error { + // Pods + var cont string + for { + ps, err := mon.cli.CoreV1().Pods(ns).List(ctx, metav1.ListOptions{Limit: 500, Continue: cont}) + if err != nil { + return err + } + + mon._emitPodConditions(ps) + mon._emitPodContainerStatuses(ps) + mon._emitPodContainerRestartCounter(ps) + + cont = ps.Continue + if cont == "" { + break + } + } + + // DaemonSets + cont = "" + for { + dss, err := mon.cli.AppsV1().DaemonSets(ns).List(ctx, metav1.ListOptions{Limit: 500, Continue: cont}) + if err != nil { + return err + } + + for _, ds := range dss.Items { + if ds.Status.DesiredNumberScheduled == ds.Status.NumberAvailable { + continue + } + + mon.emitGauge("daemonset.statuses", 1, map[string]string{ + "desiredNumberScheduled": strconv.Itoa(int(ds.Status.DesiredNumberScheduled)), + "name": ds.Name, + "namespace": ds.Namespace, + "numberAvailable": strconv.Itoa(int(ds.Status.NumberAvailable)), + }) + } + + cont = dss.Continue + if cont == "" { + break + } + } + + // Deployments + cont = "" + for { + ds, err := mon.cli.AppsV1().Deployments(ns).List(ctx, metav1.ListOptions{Limit: 500, Continue: cont}) + if err != nil { + return err + } + + for _, d := range ds.Items { + if d.Status.Replicas == d.Status.AvailableReplicas { + continue + } + + mon.emitGauge("deployment.statuses", 1, map[string]string{ + "availableReplicas": strconv.Itoa(int(d.Status.AvailableReplicas)), + "name": d.Name, + "namespace": d.Namespace, + "replicas": strconv.Itoa(int(d.Status.Replicas)), + }) + } + + cont = ds.Continue + if cont == "" { + break + } + } + + // ReplicaSets + cont = "" + for { + rss, err := mon.cli.AppsV1().ReplicaSets(ns).List(ctx, metav1.ListOptions{Limit: 500, Continue: cont}) + if err != nil { + return err + } + + for _, rs := range rss.Items { + if rs.Status.Replicas == rs.Status.AvailableReplicas { + continue + } + + mon.emitGauge("replicaset.statuses", 1, map[string]string{ + "availableReplicas": strconv.Itoa(int(rs.Status.AvailableReplicas)), + "name": rs.Name, + "namespace": rs.Namespace, + "replicas": strconv.Itoa(int(rs.Status.Replicas)), + }) + } + + cont = rss.Continue + if cont == "" { + break + } + } + + // StatefulSets + cont = "" + for { + sss, err := mon.cli.AppsV1().StatefulSets(ns).List(ctx, metav1.ListOptions{Limit: 500, Continue: cont}) + if err != nil { + return err + } + + for _, ss := range sss.Items { + if ss.Status.Replicas == ss.Status.ReadyReplicas { + continue + } + + mon.emitGauge("statefulset.statuses", 1, map[string]string{ + "name": ss.Name, + "namespace": ss.Namespace, + "replicas": strconv.Itoa(int(ss.Status.Replicas)), + "readyReplicas": strconv.Itoa(int(ss.Status.ReadyReplicas)), + }) + } + + cont = sss.Continue + if cont == "" { + break + } + } + + return nil +} + +func (mon *Monitor) _emitPodConditions(ps *corev1.PodList) { + for _, p := range ps.Items { + if !namespace.IsOpenShiftNamespace(p.Namespace) { + continue + } + + if p.Status.Phase == corev1.PodSucceeded { + continue + } + + if p.Status.Reason == events.PreemptContainer { + continue + } + + for _, c := range p.Status.Conditions { + if c.Status == podConditionsExpected[c.Type] { + continue + } + + mon.emitGauge("pod.conditions", 1, map[string]string{ + "name": p.Name, + "namespace": p.Namespace, + "nodeName": p.Spec.NodeName, + "status": string(c.Status), + "type": string(c.Type), + }) + + if mon.hourlyRun { + mon.log.WithFields(logrus.Fields{ + "metric": "pod.conditions", + "name": p.Name, + "namespace": p.Namespace, + "nodeName": p.Spec.NodeName, + "status": c.Status, + "type": c.Type, + "message": c.Message, + }).Print() + } + } + } +} + +func (mon *Monitor) _emitPodContainerStatuses(ps *corev1.PodList) { + for _, p := range ps.Items { + if !namespace.IsOpenShiftNamespace(p.Namespace) { + continue + } + + if p.Status.Phase == corev1.PodSucceeded { + continue + } + + for _, cs := range p.Status.ContainerStatuses { + if cs.State.Waiting == nil { + continue + } + + containerStatus := map[string]string{ + "name": p.Name, + "namespace": p.Namespace, + "nodeName": p.Spec.NodeName, + "containername": cs.Name, + "reason": cs.State.Waiting.Reason, + "lastTerminationState": "", + } + + if cs.LastTerminationState.Terminated != nil { + containerStatus["lastTerminationState"] = cs.LastTerminationState.Terminated.Reason + } + + mon.emitGauge("pod.containerstatuses", 1, containerStatus) + + if mon.hourlyRun { + logFields := logrus.Fields{ + "metric": "pod.containerstatuses", + "message": cs.State.Waiting.Message, + } + for label, labelVal := range containerStatus { + logFields[label] = labelVal + } + mon.log.WithFields(logFields).Print() + } + } + } +} + +func (mon *Monitor) _emitPodContainerRestartCounter(ps *corev1.PodList) { + for _, p := range ps.Items { + if !namespace.IsOpenShiftNamespace(p.Namespace) { + continue + } + + //Sum up the total number of restarts in the pod to match the number of restarts shown in the 'oc get pods' display + t := int32(0) + for _, cs := range p.Status.ContainerStatuses { + t += cs.RestartCount + } + + if t < restartCounterThreshold { + continue + } + + mon.emitGauge("pod.restartcounter", int64(t), map[string]string{ + "name": p.Name, + "namespace": p.Namespace, + }) + + if mon.hourlyRun { + mon.log.WithFields(logrus.Fields{ + "metric": "pod.restartcounter", + "name": p.Name, + "namespace": p.Namespace, + }).Print() + } + } +} diff --git a/pkg/monitor/cluster/podconditions_test.go b/pkg/monitor/cluster/workloadstatuses_test.go similarity index 53% rename from pkg/monitor/cluster/podconditions_test.go rename to pkg/monitor/cluster/workloadstatuses_test.go index 59b4b18b7cc..ecc7417335a 100644 --- a/pkg/monitor/cluster/podconditions_test.go +++ b/pkg/monitor/cluster/workloadstatuses_test.go @@ -11,14 +11,161 @@ import ( "github.com/sirupsen/logrus/hooks/test" "github.com/stretchr/testify/assert" "go.uber.org/mock/gomock" - + appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/kubernetes/fake" + "k8s.io/kubernetes/pkg/kubelet/events" + fakeclient "sigs.k8s.io/controller-runtime/pkg/client/fake" mock_metrics "github.com/Azure/ARO-RP/pkg/util/mocks/metrics" + testlog "github.com/Azure/ARO-RP/test/util/log" ) +func TestEmitWorkloadStatuses(t *testing.T) { + ctx := context.Background() + _, log := testlog.New() + + controller := gomock.NewController(t) + defer controller.Finish() + + m := mock_metrics.NewMockEmitter(controller) + + type testCase struct { + name string + objects []runtime.Object + mocks func(*mock_metrics.MockEmitter) + wantErr bool + } + + for _, tt := range []*testCase{ + { + name: "all workloads healthy", + objects: []runtime.Object{ + &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: "openshift"}}, + &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{Name: "d1", Namespace: "openshift"}, + Status: appsv1.DeploymentStatus{Replicas: 1, AvailableReplicas: 1}, + }, + }, + mocks: func(m *mock_metrics.MockEmitter) { + m.EXPECT().EmitGauge("pod.count", int64(0), map[string]string{}) + m.EXPECT().EmitGauge("daemonset.count", int64(0), map[string]string{}) + m.EXPECT().EmitGauge("deployment.count", int64(1), map[string]string{}) + m.EXPECT().EmitGauge("replicaset.count", int64(0), map[string]string{}) + m.EXPECT().EmitGauge("statefulset.count", int64(0), map[string]string{}) + }, + }, + { + name: "unhealthy workloads in openshift namespace", + objects: []runtime.Object{ + &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: "openshift"}}, + &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "p1", Namespace: "openshift"}, + Spec: corev1.PodSpec{NodeName: "test-node"}, + Status: corev1.PodStatus{Conditions: []corev1.PodCondition{{Type: corev1.PodReady, Status: corev1.ConditionFalse}}}, + }, + &appsv1.DaemonSet{ + ObjectMeta: metav1.ObjectMeta{Name: "ds1", Namespace: "openshift"}, + Status: appsv1.DaemonSetStatus{DesiredNumberScheduled: 2, NumberAvailable: 1}, + }, + &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{Name: "d1", Namespace: "openshift"}, + Status: appsv1.DeploymentStatus{Replicas: 2, AvailableReplicas: 1}, + }, + &appsv1.ReplicaSet{ + ObjectMeta: metav1.ObjectMeta{Name: "rs1", Namespace: "openshift"}, + Status: appsv1.ReplicaSetStatus{Replicas: 2, AvailableReplicas: 1}, + }, + &appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{Name: "ss1", Namespace: "openshift"}, + Status: appsv1.StatefulSetStatus{Replicas: 2, ReadyReplicas: 1}, + }, + }, + mocks: func(m *mock_metrics.MockEmitter) { + m.EXPECT().EmitGauge("pod.count", int64(1), map[string]string{}) + m.EXPECT().EmitGauge("daemonset.count", int64(1), map[string]string{}) + m.EXPECT().EmitGauge("deployment.count", int64(1), map[string]string{}) + m.EXPECT().EmitGauge("replicaset.count", int64(1), map[string]string{}) + m.EXPECT().EmitGauge("statefulset.count", int64(1), map[string]string{}) + m.EXPECT().EmitGauge("pod.conditions", int64(1), gomock.Any()) + m.EXPECT().EmitGauge("daemonset.statuses", int64(1), gomock.Any()) + m.EXPECT().EmitGauge("deployment.statuses", int64(1), gomock.Any()) + m.EXPECT().EmitGauge("replicaset.statuses", int64(1), gomock.Any()) + m.EXPECT().EmitGauge("statefulset.statuses", int64(1), gomock.Any()) + }, + }, + { + name: "unhealthy workloads in customer namespace are ignored", + objects: []runtime.Object{ + &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: "customer-test"}}, + &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{Name: "d1", Namespace: "customer-test"}, + Status: appsv1.DeploymentStatus{Replicas: 2, AvailableReplicas: 1}, + }, + }, + mocks: func(m *mock_metrics.MockEmitter) { + m.EXPECT().EmitGauge("pod.count", int64(0), map[string]string{}) + m.EXPECT().EmitGauge("daemonset.count", int64(0), map[string]string{}) + m.EXPECT().EmitGauge("deployment.count", int64(1), map[string]string{}) + m.EXPECT().EmitGauge("replicaset.count", int64(0), map[string]string{}) + m.EXPECT().EmitGauge("statefulset.count", int64(0), map[string]string{}) + }, + }, + { + name: "pod with high restart count in openshift namespace", + objects: []runtime.Object{ + &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: "openshift-monitoring"}}, + &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "p-restarts", Namespace: "openshift-monitoring"}, + Status: corev1.PodStatus{ContainerStatuses: []corev1.ContainerStatus{{RestartCount: 20}}}, + }, + }, + mocks: func(m *mock_metrics.MockEmitter) { + m.EXPECT().EmitGauge("pod.count", int64(1), map[string]string{}) + m.EXPECT().EmitGauge("daemonset.count", int64(0), map[string]string{}) + m.EXPECT().EmitGauge("deployment.count", int64(0), map[string]string{}) + m.EXPECT().EmitGauge("replicaset.count", int64(0), map[string]string{}) + m.EXPECT().EmitGauge("statefulset.count", int64(0), map[string]string{}) + m.EXPECT().EmitGauge("pod.restartcounter", int64(20), map[string]string{ + "name": "p-restarts", + "namespace": "openshift-monitoring", + }) + }, + }, + } { + t.Run(tt.name, func(t *testing.T) { + clientGoFake := fake.NewSimpleClientset() + // The fake client builder needs to know about the object types to return them + scheme := runtime.NewScheme() + appsv1.AddToScheme(scheme) + corev1.AddToScheme(scheme) + controllerRuntimeFake := fakeclient.NewClientBuilder().WithScheme(scheme).WithRuntimeObjects(tt.objects...).Build() + + // Populate the client-go fake client with the objects for the second phase of the check + for _, o := range tt.objects { + clientGoFake.Tracker().Add(o) + } + + mon := &Monitor{ + cli: clientGoFake, + ocpclientset: controllerRuntimeFake, + m: m, + log: log, + } + + if tt.mocks != nil { + tt.mocks(m) + } + + if err := mon.emitWorkloadStatuses(ctx); (err != nil) != tt.wantErr { + t.Errorf("Monitor.emitWorkloadStatuses() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} + func TestEmitPodConditions(t *testing.T) { cli := fake.NewSimpleClientset( &corev1.Pod{ // metrics expected @@ -54,6 +201,36 @@ func TestEmitPodConditions(t *testing.T) { }, }, }, + &corev1.Pod{ // no metrics expected - succeeded pod + ObjectMeta: metav1.ObjectMeta{ + Name: "succeeded-pod", + Namespace: "openshift", + }, + Status: corev1.PodStatus{ + Phase: corev1.PodSucceeded, + Conditions: []corev1.PodCondition{ + { + Type: corev1.PodReady, + Status: corev1.ConditionFalse, + }, + }, + }, + }, + &corev1.Pod{ // no metrics expected - preempted pod + ObjectMeta: metav1.ObjectMeta{ + Name: "preempted-pod", + Namespace: "openshift", + }, + Status: corev1.PodStatus{ + Reason: events.PreemptContainer, + Conditions: []corev1.PodCondition{ + { + Type: corev1.PodReady, + Status: corev1.ConditionFalse, + }, + }, + }, + }, ) controller := gomock.NewController(t) @@ -149,6 +326,25 @@ func TestEmitPodContainerStatuses(t *testing.T) { NodeName: "fake-node-name", }, }, + &corev1.Pod{ // no metrics expected - succeeded pod + ObjectMeta: metav1.ObjectMeta{ + Name: "succeeded-pod", + Namespace: "openshift", + }, + Status: corev1.PodStatus{ + Phase: corev1.PodSucceeded, + ContainerStatuses: []corev1.ContainerStatus{ + { + Name: "containername", + State: corev1.ContainerState{ + Waiting: &corev1.ContainerStateWaiting{ + Reason: "ImagePullBackOff", + }, + }, + }, + }, + }, + }, ) controller := gomock.NewController(t)