Skip to content

Commit 2ecdee7

Browse files
committed
chore: minor changes
Signed-off-by: Ajay Mishra <[email protected]>
1 parent ed379cc commit 2ecdee7

File tree

4 files changed

+247
-11
lines changed

4 files changed

+247
-11
lines changed
Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
// Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package annotations
16+
17+
import (
18+
"context"
19+
"encoding/json"
20+
"fmt"
21+
"log/slog"
22+
"maps"
23+
24+
v1 "k8s.io/api/core/v1"
25+
apierrors "k8s.io/apimachinery/pkg/api/errors"
26+
"k8s.io/apimachinery/pkg/types"
27+
"k8s.io/client-go/util/retry"
28+
"sigs.k8s.io/controller-runtime/pkg/client"
29+
)
30+
31+
const (
32+
AnnotationKey = "k8s-object-monitor.nvsentinel.nvidia.com/policy-matches"
33+
)
34+
35+
type PolicyMatchState map[string]string
36+
37+
type Manager struct {
38+
client client.Client
39+
}
40+
41+
func NewManager(c client.Client) *Manager {
42+
return &Manager{client: c}
43+
}
44+
45+
func (m *Manager) AddMatch(ctx context.Context, nodeName, stateKey, targetNode string) error {
46+
slog.Debug("Adding match state to annotation", "node", nodeName, "stateKey", stateKey, "targetNode", targetNode)
47+
48+
return m.updateAnnotation(ctx, nodeName, func(state PolicyMatchState) (PolicyMatchState, bool) {
49+
if _, exists := state[stateKey]; exists {
50+
return state, false
51+
}
52+
53+
state[stateKey] = targetNode
54+
55+
return state, true
56+
})
57+
}
58+
59+
func (m *Manager) RemoveMatch(ctx context.Context, nodeName, stateKey string) error {
60+
slog.Debug("Removing match state from annotation", "node", nodeName, "stateKey", stateKey)
61+
62+
err := m.updateAnnotation(ctx, nodeName, func(state PolicyMatchState) (PolicyMatchState, bool) {
63+
if _, exists := state[stateKey]; !exists {
64+
return state, false
65+
}
66+
67+
delete(state, stateKey)
68+
69+
return state, true
70+
})
71+
if apierrors.IsNotFound(err) {
72+
return nil
73+
}
74+
75+
return err
76+
}
77+
78+
func (m *Manager) GetMatches(ctx context.Context, nodeName string) (PolicyMatchState, error) {
79+
node := &v1.Node{}
80+
if err := m.client.Get(ctx, types.NamespacedName{Name: nodeName}, node); err != nil {
81+
if apierrors.IsNotFound(err) {
82+
return make(PolicyMatchState), nil
83+
}
84+
85+
return make(PolicyMatchState), fmt.Errorf("failed to get node: %w", err)
86+
}
87+
88+
if node.Annotations == nil {
89+
return make(PolicyMatchState), nil
90+
}
91+
92+
annotationStr, exists := node.Annotations[AnnotationKey]
93+
if !exists || annotationStr == "" {
94+
return make(PolicyMatchState), nil
95+
}
96+
97+
var state PolicyMatchState
98+
if err := json.Unmarshal([]byte(annotationStr), &state); err != nil {
99+
return nil, fmt.Errorf("failed to unmarshal annotation: %w", err)
100+
}
101+
102+
return state, nil
103+
}
104+
105+
func (m *Manager) LoadAllMatches(ctx context.Context) (map[string]string, error) {
106+
nodeList := &v1.NodeList{}
107+
if err := m.client.List(ctx, nodeList); err != nil {
108+
return nil, fmt.Errorf("failed to list nodes: %w", err)
109+
}
110+
111+
allMatches := make(map[string]string)
112+
113+
for _, node := range nodeList.Items {
114+
matches, err := m.GetMatches(ctx, node.Name)
115+
if err != nil {
116+
slog.Warn("Failed to load matches from node", "node", node.Name, "error", err)
117+
continue
118+
}
119+
120+
maps.Copy(allMatches, matches)
121+
}
122+
123+
return allMatches, nil
124+
}
125+
126+
func (m *Manager) updateAnnotation(ctx context.Context,
127+
nodeName string,
128+
updateFn func(PolicyMatchState) (PolicyMatchState, bool),
129+
) error {
130+
slog.Debug("Updating annotation", "node", nodeName)
131+
132+
return retry.RetryOnConflict(retry.DefaultBackoff, func() error {
133+
node := &v1.Node{}
134+
if err := m.client.Get(ctx, types.NamespacedName{Name: nodeName}, node); err != nil {
135+
if apierrors.IsNotFound(err) {
136+
slog.Warn("Node not found, assuming deleted", "node", nodeName)
137+
return nil
138+
}
139+
140+
return fmt.Errorf("failed to get node: %w", err)
141+
}
142+
143+
if node.Annotations == nil {
144+
node.Annotations = make(map[string]string)
145+
}
146+
147+
state := make(PolicyMatchState)
148+
if existingAnnotation := node.Annotations[AnnotationKey]; existingAnnotation != "" {
149+
if err := json.Unmarshal([]byte(existingAnnotation), &state); err != nil {
150+
slog.Warn("Failed to parse existing annotation, starting fresh", "node", nodeName, "error", err)
151+
}
152+
}
153+
154+
updatedState, changed := updateFn(state)
155+
if !changed {
156+
return nil
157+
}
158+
159+
if len(updatedState) == 0 {
160+
delete(node.Annotations, AnnotationKey)
161+
} else {
162+
annotationBytes, err := json.Marshal(updatedState)
163+
if err != nil {
164+
return fmt.Errorf("failed to marshal state: %w", err)
165+
}
166+
167+
node.Annotations[AnnotationKey] = string(annotationBytes)
168+
}
169+
170+
return m.client.Update(ctx, node, &client.UpdateOptions{
171+
FieldManager: "kubernetes-object-monitor",
172+
})
173+
})
174+
}

health-monitors/kubernetes-object-monitor/pkg/controller/reconciler.go

Lines changed: 47 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,10 @@ import (
1717
"context"
1818
"fmt"
1919
"log/slog"
20+
"maps"
2021
"sync"
2122

23+
"github.com/nvidia/nvsentinel/health-monitors/kubernetes-object-monitor/pkg/annotations"
2224
"github.com/nvidia/nvsentinel/health-monitors/kubernetes-object-monitor/pkg/config"
2325
"github.com/nvidia/nvsentinel/health-monitors/kubernetes-object-monitor/pkg/metrics"
2426
"github.com/nvidia/nvsentinel/health-monitors/kubernetes-object-monitor/pkg/policy"
@@ -36,6 +38,7 @@ type ResourceReconciler struct {
3638
client.Client
3739
evaluator *policy.Evaluator
3840
publisher HealthEventPublisher
41+
annotationMgr *annotations.Manager
3942
policies []config.Policy
4043
gvk schema.GroupVersionKind
4144
matchStates map[string]string
@@ -46,19 +49,37 @@ func NewResourceReconciler(
4649
c client.Client,
4750
evaluator *policy.Evaluator,
4851
pub HealthEventPublisher,
52+
annotationMgr *annotations.Manager,
4953
policies []config.Policy,
5054
gvk schema.GroupVersionKind,
5155
) *ResourceReconciler {
5256
return &ResourceReconciler{
53-
Client: c,
54-
evaluator: evaluator,
55-
publisher: pub,
56-
policies: policies,
57-
gvk: gvk,
58-
matchStates: make(map[string]string),
57+
Client: c,
58+
evaluator: evaluator,
59+
publisher: pub,
60+
annotationMgr: annotationMgr,
61+
policies: policies,
62+
gvk: gvk,
63+
matchStates: make(map[string]string),
5964
}
6065
}
6166

67+
func (r *ResourceReconciler) LoadState(ctx context.Context) error {
68+
allMatches, err := r.annotationMgr.LoadAllMatches(ctx)
69+
if err != nil {
70+
return fmt.Errorf("failed to load match state: %w", err)
71+
}
72+
73+
r.matchStatesMu.Lock()
74+
defer r.matchStatesMu.Unlock()
75+
76+
maps.Copy(r.matchStates, allMatches)
77+
78+
slog.Info("Loaded policy match state from annotations", "gvk", r.gvk.String(), "matches", len(allMatches))
79+
80+
return nil
81+
}
82+
6283
func (r *ResourceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
6384
obj := &unstructured.Unstructured{}
6485
obj.SetGroupVersionKind(r.gvk)
@@ -93,6 +114,8 @@ func (r *ResourceReconciler) handleGetError(ctx context.Context, err error, req
93114
}
94115

95116
func (r *ResourceReconciler) cleanupDeletedResource(ctx context.Context, req ctrl.Request) {
117+
slog.Info("Cleaning up deleted resource", "resource", req.NamespacedName)
118+
96119
for _, p := range r.policies {
97120
if !p.Enabled {
98121
continue
@@ -119,9 +142,19 @@ func (r *ResourceReconciler) cleanupDeletedResource(ctx context.Context, req ctr
119142
metrics.HealthEventsPublishErrors.WithLabelValues(p.Name, "grpc_error").Inc()
120143
}
121144

145+
slog.Debug("Removing match state for deleted resource",
146+
"resource", req.NamespacedName,
147+
"node", nodeName,
148+
"policy", p.Name,
149+
"stateKey", stateKey)
150+
122151
r.matchStatesMu.Lock()
123152
delete(r.matchStates, stateKey)
124153
r.matchStatesMu.Unlock()
154+
155+
if err := r.annotationMgr.RemoveMatch(ctx, nodeName, stateKey); err != nil {
156+
slog.Error("Failed to remove match state from annotation", "node", nodeName, "stateKey", stateKey, "error", err)
157+
}
125158
}
126159
}
127160
}
@@ -179,6 +212,10 @@ func (r *ResourceReconciler) handleUnhealthyTransition(
179212
r.matchStates[stateKey] = nodeName
180213
r.matchStatesMu.Unlock()
181214

215+
if err := r.annotationMgr.AddMatch(ctx, nodeName, stateKey, nodeName); err != nil {
216+
slog.Error("Failed to persist match state to annotation", "node", nodeName, "stateKey", stateKey, "error", err)
217+
}
218+
182219
metrics.PolicyMatches.WithLabelValues(p.Name, nodeName, r.gvk.Kind).Inc()
183220

184221
return nil
@@ -199,6 +236,10 @@ func (r *ResourceReconciler) handleHealthyTransition(
199236
delete(r.matchStates, stateKey)
200237
r.matchStatesMu.Unlock()
201238

239+
if err := r.annotationMgr.RemoveMatch(ctx, nodeName, stateKey); err != nil {
240+
slog.Error("Failed to remove match state from annotation", "node", nodeName, "stateKey", stateKey, "error", err)
241+
}
242+
202243
return nil
203244
}
204245

health-monitors/kubernetes-object-monitor/pkg/controller/reconciler_test.go

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"testing"
2121
"time"
2222

23+
"github.com/nvidia/nvsentinel/health-monitors/kubernetes-object-monitor/pkg/annotations"
2324
celenv "github.com/nvidia/nvsentinel/health-monitors/kubernetes-object-monitor/pkg/cel"
2425
"github.com/nvidia/nvsentinel/health-monitors/kubernetes-object-monitor/pkg/config"
2526
"github.com/nvidia/nvsentinel/health-monitors/kubernetes-object-monitor/pkg/controller"
@@ -362,22 +363,23 @@ func TestReconciler_ColdStart(t *testing.T) {
362363
postRestartAction: func(t *testing.T, s *testSetup, nodeName string, _ *v1.Node) {
363364
updateNodeStatus(t, s, nodeName, v1.ConditionFalse)
364365
},
365-
expectEvent: true,
366-
expectHealthy: false,
366+
expectEvent: false,
367367
},
368368
{
369369
name: "healthy resource",
370370
postRestartAction: func(t *testing.T, s *testSetup, nodeName string, _ *v1.Node) {
371371
updateNodeStatus(t, s, nodeName, v1.ConditionTrue)
372372
},
373-
expectEvent: false,
373+
expectEvent: true,
374+
expectHealthy: true,
374375
},
375376
{
376377
name: "deleted resource",
377378
postRestartAction: func(t *testing.T, s *testSetup, _ string, node *v1.Node) {
378379
require.NoError(t, s.k8sClient.Delete(s.ctx, node))
379380
},
380-
expectEvent: false,
381+
expectEvent: true,
382+
expectHealthy: true,
381383
},
382384
}
383385

@@ -492,10 +494,13 @@ func setupTestWithPolicies(t *testing.T, policies []config.Policy) *testSetup {
492494
Kind: "Node",
493495
}
494496

497+
annotationMgr := annotations.NewManager(k8sClient)
498+
495499
reconciler := controller.NewResourceReconciler(
496500
k8sClient,
497501
evaluator,
498502
mockPub,
503+
annotationMgr,
499504
policies,
500505
gvk,
501506
)
@@ -544,10 +549,13 @@ func setupTestWithCRD(t *testing.T, policies []config.Policy, crd *apiextensions
544549
Kind: crd.Spec.Names.Kind,
545550
}
546551

552+
annotationMgr := annotations.NewManager(k8sClient)
553+
547554
reconciler := controller.NewResourceReconciler(
548555
k8sClient,
549556
evaluator,
550557
mockPub,
558+
annotationMgr,
551559
policies,
552560
gvk,
553561
)
@@ -649,14 +657,21 @@ func restartReconciler(t *testing.T, setup *testSetup) *testSetup {
649657

650658
policies := []config.Policy{defaultNodeNotReadyPolicy()}
651659

660+
annotationMgr := annotations.NewManager(setup.k8sClient)
661+
652662
reconciler := controller.NewResourceReconciler(
653663
setup.k8sClient,
654664
setup.evaluator,
655665
mockPub,
666+
annotationMgr,
656667
policies,
657668
gvk,
658669
)
659670

671+
if err := reconciler.LoadState(setup.ctx); err != nil {
672+
t.Fatalf("Failed to load state after restart: %v", err)
673+
}
674+
660675
return &testSetup{
661676
ctx: setup.ctx,
662677
k8sClient: setup.k8sClient,

health-monitors/kubernetes-object-monitor/pkg/initializer/initializer.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"time"
2525

2626
pb "github.com/nvidia/nvsentinel/data-models/pkg/protos"
27+
"github.com/nvidia/nvsentinel/health-monitors/kubernetes-object-monitor/pkg/annotations"
2728
celenv "github.com/nvidia/nvsentinel/health-monitors/kubernetes-object-monitor/pkg/cel"
2829
"github.com/nvidia/nvsentinel/health-monitors/kubernetes-object-monitor/pkg/config"
2930
"github.com/nvidia/nvsentinel/health-monitors/kubernetes-object-monitor/pkg/controller"
@@ -150,10 +151,15 @@ func registerControllers(
150151
policies []config.Policy,
151152
maxConcurrentReconciles int,
152153
) error {
154+
annotationMgr := annotations.NewManager(mgr.GetClient())
153155
gvkPolicies := groupPoliciesByGVK(policies)
154156

155157
for gvk, policies := range gvkPolicies {
156-
reconciler := controller.NewResourceReconciler(mgr.GetClient(), evaluator, pub, policies, gvk)
158+
reconciler := controller.NewResourceReconciler(mgr.GetClient(), evaluator, pub, annotationMgr, policies, gvk)
159+
160+
if err := reconciler.LoadState(context.Background()); err != nil {
161+
slog.Warn("Failed to load state for controller, starting fresh", "gvk", gvk.String(), "error", err)
162+
}
157163

158164
if err := ctrl.NewControllerManagedBy(mgr).
159165
For(newUnstructuredForGVK(gvk)).

0 commit comments

Comments
 (0)