Skip to content

Commit 12b9769

Browse files
committed
fix node informer restart
Signed-off-by: Davanum Srinivas <[email protected]>
1 parent f07637a commit 12b9769

File tree

4 files changed

+46
-55
lines changed

4 files changed

+46
-55
lines changed

fault-quarantine/main.go

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ import (
2828

2929
"github.com/nvidia/nvsentinel/commons/pkg/logger"
3030
"github.com/nvidia/nvsentinel/fault-quarantine/pkg/config"
31+
"github.com/nvidia/nvsentinel/fault-quarantine/pkg/informer"
32+
"github.com/nvidia/nvsentinel/fault-quarantine/pkg/nodeinfo"
3133
"github.com/nvidia/nvsentinel/fault-quarantine/pkg/reconciler"
3234
sdkconfig "github.com/nvidia/nvsentinel/store-client/pkg/config"
3335
"github.com/nvidia/nvsentinel/store-client/pkg/datastore"
@@ -125,6 +127,7 @@ func startMetricsServer(metricsPort string) {
125127
slog.Info("Metrics server goroutine started")
126128
}
127129

130+
//nolint:cyclop // Complexity slightly increased by NodeInformer initialization (was 10, now 12)
128131
func run() error {
129132
// Create a context that gets cancelled on OS interrupt signals
130133
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
@@ -187,11 +190,36 @@ func run() error {
187190

188191
slog.Info("Successfully initialized k8sclient")
189192

193+
// Create the work signal channel (buffered channel acting as semaphore)
194+
workSignal := make(chan struct{}, 1) // Buffer size 1 is usually sufficient
195+
196+
// Create NodeInfo for tracking node quarantine status
197+
nodeInfo := nodeinfo.NewNodeInfo(workSignal)
198+
199+
// Create and start NodeInformer before reconciler initialization
200+
// This matches the pre-postgres pattern where NodeInformer runs independently
201+
slog.Info("Creating NodeInformer")
202+
203+
nodeInformer, err := informer.NewNodeInformer(k8sClient.GetK8sClient(),
204+
30*time.Minute, workSignal, nodeInfo)
205+
if err != nil {
206+
return fmt.Errorf("failed to create node informer: %w", err)
207+
}
208+
209+
slog.Info("Starting NodeInformer")
210+
211+
if err := nodeInformer.Run(ctx.Done()); err != nil {
212+
return fmt.Errorf("failed to start node informer: %w", err)
213+
}
214+
215+
slog.Info("NodeInformer started and cache synced")
216+
190217
reconcilerCfg := reconciler.ReconcilerConfig{
191218
TomlConfig: *tomlCfg,
192219
DataStore: dataStore, // Use new datastore abstraction
193220
Pipeline: pipeline, // Use new pipeline types
194221
K8sClient: k8sClient,
222+
NodeInformer: nodeInformer, // Pass the NodeInformer to reconciler
195223
DryRun: *dryRun,
196224
CircuitBreakerEnabled: *circuitBreakerEnabled,
197225
UnprocessedEventsMetricUpdateInterval: time.Duration(unprocessedEventsMetricUpdateIntervalSeconds) *
@@ -204,9 +232,6 @@ func run() error {
204232
},
205233
}
206234

207-
// Create the work signal channel (buffered channel acting as semaphore)
208-
workSignal := make(chan struct{}, 1) // Buffer size 1 is usually sufficient
209-
210235
// Pass the workSignal channel to the Reconciler
211236
rec := reconciler.NewReconciler(ctx, reconcilerCfg, workSignal)
212237

fault-quarantine/pkg/informer/node_informer.go

Lines changed: 10 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -133,57 +133,25 @@ func (ni *NodeInformer) Run(stopCh <-chan struct{}) error {
133133

134134
klog.Info("NodeInformer cache synced")
135135

136-
// Retry recalculateCounts if it returns 0 nodes (cache might still be populating)
137-
// This handles the race between cache sync and cache population
138-
maxRetries := 10
139-
retryDelay := 100 * time.Millisecond
140-
141-
for attempt := 0; attempt < maxRetries; attempt++ {
142-
changed, err := ni.recalculateCounts()
143-
if err != nil {
144-
klog.Errorf("Initial count calculation failed on attempt %d: %v", attempt+1, err)
145-
146-
if attempt < maxRetries-1 {
147-
time.Sleep(retryDelay)
148-
retryDelay *= 2 // Exponential backoff
149-
150-
if retryDelay > 2*time.Second {
151-
retryDelay = 2 * time.Second
152-
}
153-
154-
continue
155-
}
156-
157-
break
158-
}
136+
// Perform initial count calculation without failing if 0 nodes
137+
// Nodes may be added dynamically (e.g., in CI environments or with cluster autoscaling)
138+
changed, err := ni.recalculateCounts()
139+
if err != nil {
140+
klog.Warningf("Initial count calculation failed: %v - will retry on node events", err)
141+
} else {
142+
_ = changed // Ignore changed status during initial population
159143

160-
// Check if we got nodes
161144
ni.mutex.RLock()
162145
totalNodes := ni.totalGpuNodes
163146
ni.mutex.RUnlock()
164147

165148
if totalNodes > 0 {
166149
klog.Infof("NodeInformer cache populated: %d GPU nodes found", totalNodes)
167-
break
168-
}
169-
170-
if attempt < maxRetries-1 {
171-
klog.V(2).Infof("NodeInformer cache still empty after sync, retry %d/%d in %v",
172-
attempt+1, maxRetries, retryDelay)
173-
time.Sleep(retryDelay)
174-
retryDelay *= 2 // Exponential backoff
175-
176-
if retryDelay > 2*time.Second {
177-
retryDelay = 2 * time.Second
178-
}
179150
} else {
180-
klog.Fatalf("NodeInformer cache still reports 0 nodes after %d retries and ~%v total wait time. "+
181-
"This indicates a critical initialization failure - either cluster has no GPU nodes "+
182-
"or NodeInformer cannot populate its cache. Pod will restart.",
183-
maxRetries, time.Duration(maxRetries)*retryDelay)
151+
klog.Warningf("NodeInformer cache synced but found 0 GPU nodes. " +
152+
"This is expected in CI environments where nodes are created asynchronously. " +
153+
"Nodes will be discovered as they are added to the cluster.")
184154
}
185-
186-
_ = changed // Ignore changed status during initial population
187155
}
188156

189157
return nil

fault-quarantine/pkg/reconciler/reconciler.go

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ type ReconcilerConfig struct {
5252
DataStore datastore.DataStore
5353
Pipeline interface{} // Custom pipeline for change stream filtering (e.g., mongo.Pipeline)
5454
K8sClient K8sClientInterface
55+
NodeInformer *informer.NodeInformer // NodeInformer instance (created in main.go)
5556
DryRun bool
5657
CircuitBreakerEnabled bool
5758
UnprocessedEventsMetricUpdateInterval time.Duration
@@ -153,10 +154,10 @@ func (r *Reconciler) SetLabelKeys(labelKeyPrefix string) {
153154

154155
// nolint: cyclop, gocognit //fix this as part of NGCC-21793
155156
func (r *Reconciler) Start(ctx context.Context) {
156-
nodeInformer, err := informer.NewNodeInformer(r.config.K8sClient.GetK8sClient(),
157-
30*time.Minute, r.workSignal, r.nodeInfo)
158-
if err != nil {
159-
klog.Fatalf("failed to initialize node informer: %+v", err)
157+
// Use the NodeInformer passed from main.go (already created and running)
158+
nodeInformer := r.config.NodeInformer
159+
if nodeInformer == nil {
160+
klog.Fatalf("NodeInformer is nil - must be created and started before calling reconciler.Start()")
160161
}
161162

162163
// Set the callback to decrement the metric when a quarantined node with annotations is deleted
@@ -238,12 +239,8 @@ func (r *Reconciler) Start(ctx context.Context) {
238239
klog.Infof("Initial quarantinedNodesMap is: %+v, total of %d nodes", quarantinedNodesMap, len(quarantinedNodesMap))
239240
}
240241

241-
err = nodeInformer.Run(ctx.Done())
242-
if err != nil {
243-
klog.Fatalf("failed to run node informer: %+v", err)
244-
}
245-
246-
// Wait for NodeInformer cache to sync before processing any events
242+
// NodeInformer.Run() is already called in main.go before reconciler.Start()
243+
// Just wait for it to sync before processing events
247244
klog.Info("Waiting for NodeInformer cache to sync before starting event processing...")
248245

249246
for !nodeInformer.HasSynced() {

platform-connectors/pkg/connectors/kubernetes/k8s_connector.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ func (r *K8sConnector) FetchAndProcessHealthMetric(ctx context.Context) {
8686
if healthEvents == nil {
8787
continue
8888
}
89+
8990
if err := r.processHealthEvents(ctx, healthEvents); err != nil {
9091
slog.Error("Not able to process healthEvent", "error", err)
9192
r.ringBuffer.HealthMetricEleProcessingFailed(healthEvents)

0 commit comments

Comments
 (0)