Skip to content

Commit 0e4796c

Browse files
committed
fix: minors
Signed-off-by: Ajay Mishra <[email protected]>
1 parent ffcb0ef commit 0e4796c

File tree

6 files changed

+60
-27
lines changed

6 files changed

+60
-27
lines changed

health-monitors/kubernetes-object-monitor/go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,4 +85,5 @@ require (
8585
)
8686

8787
replace github.com/nvidia/nvsentinel/commons => ../../commons
88+
8889
replace github.com/nvidia/nvsentinel/data-models => ../../data-models

health-monitors/kubernetes-object-monitor/go.sum

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,8 +98,6 @@ github.com/modern-go/reflect2 v1.0.3-0.20250322232337-35a7c28c31ee h1:W5t00kpgFd
9898
github.com/modern-go/reflect2 v1.0.3-0.20250322232337-35a7c28c31ee/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
9999
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA=
100100
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
101-
github.com/nvidia/nvsentinel/data-models v0.0.0-20251111124318-ba740daea463 h1:XFckTOyjQpUiTa4Qo/IaGjaIiYCd14zN4J2XRXup5hg=
102-
github.com/nvidia/nvsentinel/data-models v0.0.0-20251111124318-ba740daea463/go.mod h1:1ys7eqNRUKM/XU+A4wn/roqSFb5IoKinyN7wigsqE70=
103101
github.com/onsi/ginkgo/v2 v2.22.0 h1:Yed107/8DjTr0lKCNt7Dn8yQ6ybuDRQoMGrNFKzMfHg=
104102
github.com/onsi/ginkgo/v2 v2.22.0/go.mod h1:7Du3c42kxCUegi0IImZ1wUQzMBVecgIHjR1C+NkhLQo=
105103
github.com/onsi/gomega v1.36.1 h1:bJDPBO7ibjxcbHMgSCoo4Yj18UWbKDlLwX1x9sybDcw=

health-monitors/kubernetes-object-monitor/pkg/cel/environment.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"context"
1818
"fmt"
1919
"log/slog"
20+
"sync"
2021
"time"
2122

2223
"github.com/google/cel-go/cel"
@@ -29,6 +30,7 @@ import (
2930
type Environment struct {
3031
env *cel.Env
3132
client client.Client
33+
evalMu sync.Mutex
3234
ctx context.Context
3335
}
3436

@@ -71,6 +73,9 @@ func (e *Environment) Compile(expression string) (*cel.Ast, error) {
7173
}
7274

7375
func (e *Environment) Evaluate(ast *cel.Ast, resource any, ctx context.Context) (ref.Val, error) {
76+
e.evalMu.Lock()
77+
defer e.evalMu.Unlock()
78+
7479
e.ctx = ctx
7580

7681
prg, err := e.env.Program(ast)
@@ -123,6 +128,8 @@ func (e *Environment) lookup(args ...ref.Val) ref.Val {
123128
return types.NewErr("lookup arg[3] (name) must be string")
124129
}
125130

131+
ctx := e.ctx
132+
126133
obj := &unstructured.Unstructured{}
127134

128135
obj.SetAPIVersion(string(version))
@@ -133,7 +140,7 @@ func (e *Environment) lookup(args ...ref.Val) ref.Val {
133140
Name: string(name),
134141
}
135142

136-
if err := e.client.Get(e.ctx, key, obj); err != nil {
143+
if err := e.client.Get(ctx, key, obj); err != nil {
137144
slog.Error("Failed to get object from informer cache", "error", err)
138145
return types.NullValue
139146
}

health-monitors/kubernetes-object-monitor/pkg/controller/reconciler.go

Lines changed: 40 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"context"
1818
"fmt"
1919
"log/slog"
20+
"sync"
2021

2122
"github.com/nvidia/nvsentinel/health-monitors/kubernetes-object-monitor/pkg/config"
2223
"github.com/nvidia/nvsentinel/health-monitors/kubernetes-object-monitor/pkg/metrics"
@@ -28,16 +29,17 @@ import (
2829
)
2930

3031
type HealthEventPublisher interface {
31-
PublishHealthEvent(policy *config.Policy, nodeName string, isHealthy bool) error
32+
PublishHealthEvent(ctx context.Context, policy *config.Policy, nodeName string, isHealthy bool) error
3233
}
3334

3435
type ResourceReconciler struct {
3536
client.Client
36-
evaluator *policy.Evaluator
37-
publisher HealthEventPublisher
38-
policies []config.Policy
39-
gvk schema.GroupVersionKind
40-
matchStates map[string]bool
37+
evaluator *policy.Evaluator
38+
publisher HealthEventPublisher
39+
policies []config.Policy
40+
gvk schema.GroupVersionKind
41+
matchStates map[string]bool
42+
matchStatesMu sync.RWMutex
4143
}
4244

4345
func NewResourceReconciler(
@@ -81,7 +83,7 @@ func (r *ResourceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
8183

8284
func (r *ResourceReconciler) handleGetError(err error, req ctrl.Request) (ctrl.Result, error) {
8385
if client.IgnoreNotFound(err) == nil {
84-
r.cleanupDeletedResource(req.Name)
86+
r.cleanupDeletedResource(req)
8587
return ctrl.Result{}, nil
8688
}
8789

@@ -90,13 +92,21 @@ func (r *ResourceReconciler) handleGetError(err error, req ctrl.Request) (ctrl.R
9092
return ctrl.Result{}, err
9193
}
9294

93-
func (r *ResourceReconciler) cleanupDeletedResource(resourceName string) {
95+
func (r *ResourceReconciler) cleanupDeletedResource(req ctrl.Request) {
96+
r.matchStatesMu.Lock()
97+
defer r.matchStatesMu.Unlock()
98+
9499
for _, p := range r.policies {
95100
if !p.Enabled {
96101
continue
97102
}
98103

99-
stateKey := r.getStateKey(&p, resourceName)
104+
obj := &unstructured.Unstructured{}
105+
obj.SetGroupVersionKind(r.gvk)
106+
obj.SetNamespace(req.Namespace)
107+
obj.SetName(req.Name)
108+
109+
stateKey := r.getStateKey(&p, obj)
100110
if r.matchStates[stateKey] {
101111
delete(r.matchStates, stateKey)
102112
}
@@ -124,51 +134,65 @@ func (r *ResourceReconciler) reconcilePolicy(
124134
nodeName = obj.GetName()
125135
}
126136

127-
stateKey := r.getStateKey(p, obj.GetName())
137+
stateKey := r.getStateKey(p, obj)
138+
139+
r.matchStatesMu.RLock()
128140
wasMatched := r.matchStates[stateKey]
141+
r.matchStatesMu.RUnlock()
129142

130143
if matched && !wasMatched {
131-
return r.handleUnhealthyTransition(p, nodeName, stateKey)
144+
return r.handleUnhealthyTransition(ctx, p, nodeName, stateKey)
132145
}
133146

134147
if !matched && wasMatched {
135-
return r.handleHealthyTransition(p, nodeName, stateKey)
148+
return r.handleHealthyTransition(ctx, p, nodeName, stateKey)
136149
}
137150

138151
return nil
139152
}
140153

141154
func (r *ResourceReconciler) handleUnhealthyTransition(
155+
ctx context.Context,
142156
p *config.Policy,
143157
nodeName string,
144158
stateKey string,
145159
) error {
146-
if err := r.publisher.PublishHealthEvent(p, nodeName, false); err != nil {
160+
if err := r.publisher.PublishHealthEvent(ctx, p, nodeName, false); err != nil {
147161
metrics.HealthEventsPublishErrors.WithLabelValues(p.Name, "grpc_error").Inc()
148162
return fmt.Errorf("failed to publish unhealthy event: %w", err)
149163
}
150164

165+
r.matchStatesMu.Lock()
151166
r.matchStates[stateKey] = true
167+
r.matchStatesMu.Unlock()
168+
152169
metrics.PolicyMatches.WithLabelValues(p.Name, nodeName, r.gvk.Kind).Inc()
153170

154171
return nil
155172
}
156173

157174
func (r *ResourceReconciler) handleHealthyTransition(
175+
ctx context.Context,
158176
p *config.Policy,
159177
nodeName string,
160178
stateKey string,
161179
) error {
162-
if err := r.publisher.PublishHealthEvent(p, nodeName, true); err != nil {
180+
if err := r.publisher.PublishHealthEvent(ctx, p, nodeName, true); err != nil {
163181
metrics.HealthEventsPublishErrors.WithLabelValues(p.Name, "grpc_error").Inc()
164182
return fmt.Errorf("failed to publish healthy event: %w", err)
165183
}
166184

185+
r.matchStatesMu.Lock()
167186
delete(r.matchStates, stateKey)
187+
r.matchStatesMu.Unlock()
168188

169189
return nil
170190
}
171191

172-
func (r *ResourceReconciler) getStateKey(p *config.Policy, resourceName string) string {
173-
return fmt.Sprintf("%s/%s", p.Name, resourceName)
192+
func (r *ResourceReconciler) getStateKey(p *config.Policy, obj *unstructured.Unstructured) string {
193+
if obj.GetNamespace() != "" {
194+
return fmt.Sprintf("%s/%s/%s", p.Name, obj.GetNamespace(), obj.GetName())
195+
}
196+
197+
return fmt.Sprintf("%s/%s", p.Name, obj.GetName())
174198
}

health-monitors/kubernetes-object-monitor/pkg/controller/reconciler_test.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -304,6 +304,7 @@ func setupTestWithPolicies(t *testing.T, policies []config.Policy) *testSetup {
304304
}
305305

306306
type mockPublishedEvent struct {
307+
ctx context.Context
307308
policy *config.Policy
308309
nodeName string
309310
isHealthy bool
@@ -313,11 +314,12 @@ type mockPublisher struct {
313314
publishedEvents []mockPublishedEvent
314315
}
315316

316-
func (m *mockPublisher) PublishHealthEvent(policy *config.Policy, nodeName string, isHealthy bool) error {
317+
func (m *mockPublisher) PublishHealthEvent(ctx context.Context, policy *config.Policy, nodeName string, isHealthy bool) error {
317318
m.publishedEvents = append(m.publishedEvents, mockPublishedEvent{
318-
policy: policy,
319-
nodeName: nodeName,
320-
isHealthy: isHealthy,
319+
ctx: ctx,
320+
policy: policy,
321+
nodeName: nodeName,
322+
isHealthy: isHealthy,
321323
})
322324
return nil
323325
}

health-monitors/kubernetes-object-monitor/pkg/publisher/publisher.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,8 @@ func New(client pb.PlatformConnectorClient) *Publisher {
4242
}
4343
}
4444

45-
func (p *Publisher) PublishHealthEvent(policy *config.Policy, nodeName string, isHealthy bool) error {
45+
func (p *Publisher) PublishHealthEvent(ctx context.Context,
46+
policy *config.Policy, nodeName string, isHealthy bool) error {
4647
event := &pb.HealthEvent{
4748
Version: 1,
4849
Agent: agentName,
@@ -63,10 +64,10 @@ func (p *Publisher) PublishHealthEvent(policy *config.Policy, nodeName string, i
6364

6465
slog.Info("Publishing health event", "event", event)
6566

66-
return p.sendWithRetry(healthEvents)
67+
return p.sendWithRetry(ctx, healthEvents)
6768
}
6869

69-
func (p *Publisher) sendWithRetry(events *pb.HealthEvents) error {
70+
func (p *Publisher) sendWithRetry(ctx context.Context, events *pb.HealthEvents) error {
7071
backoff := wait.Backoff{
7172
Steps: 5,
7273
Duration: 2 * time.Second,
@@ -75,7 +76,7 @@ func (p *Publisher) sendWithRetry(events *pb.HealthEvents) error {
7576
}
7677

7778
return wait.ExponentialBackoff(backoff, func() (bool, error) {
78-
_, err := p.pcClient.HealthEventOccurredV1(context.Background(), events)
79+
_, err := p.pcClient.HealthEventOccurredV1(ctx, events)
7980
if err == nil {
8081
slog.Info("Successfully sent health event", "events", events)
8182
return true, nil

0 commit comments

Comments
 (0)