@@ -21,21 +21,14 @@ import (
2121 "log/slog"
2222 "os"
2323 "os/signal"
24- "path/filepath"
2524 "strconv"
2625 "syscall"
2726 "time"
2827
2928 "github.com/nvidia/nvsentinel/commons/pkg/logger"
3029 "github.com/nvidia/nvsentinel/commons/pkg/server"
31- "github.com/nvidia/nvsentinel/commons/pkg/statemanager"
32- "github.com/nvidia/nvsentinel/data-models/pkg/model"
33- "github.com/nvidia/nvsentinel/fault-remediation-module/pkg/reconciler"
34- "github.com/nvidia/nvsentinel/store-client-sdk/pkg/storewatcher"
30+ "github.com/nvidia/nvsentinel/fault-remediation-module/pkg/initializer"
3531 "golang.org/x/sync/errgroup"
36-
37- "go.mongodb.org/mongo-driver/bson"
38- "go.mongodb.org/mongo-driver/mongo"
3932)
4033
4134var (
@@ -45,247 +38,49 @@ var (
4538 date = "unknown"
4639)
4740
48- type config struct {
49- namespace string
50- version string
51- apiGroup string
52- templateMountPath string
53- templateFileName string
54- metricsPort string
55- mongoClientCertMountPath string
56- kubeconfigPath string
57- dryRun bool
58- enableLogCollector bool
59- updateMaxRetries int
60- updateRetryDelaySeconds int
61- }
62-
63- // parseFlags parses command-line flags and returns a config struct.
64- func parseFlags () * config {
65- cfg := & config {}
66-
67- flag .StringVar (& cfg .metricsPort , "metrics-port" , "2112" , "port to expose Prometheus metrics on" )
68- flag .StringVar (& cfg .mongoClientCertMountPath , "mongo-client-cert-mount-path" , "/etc/ssl/mongo-client" ,
69- "path where the mongodb client cert is mounted" )
70- flag .StringVar (& cfg .kubeconfigPath , "kubeconfig-path" , "" , "path to kubeconfig file" )
71- flag .BoolVar (& cfg .dryRun , "dry-run" , false , "flag to run node drainer module in dry-run mode" )
72- flag .IntVar (& cfg .updateMaxRetries , "update-max-retries" , 5 ,
73- "maximum attempts to update remediation status per event" )
74- flag .IntVar (& cfg .updateRetryDelaySeconds , "update-retry-delay-seconds" , 10 ,
75- "delay in seconds between remediation status update retries" )
76- flag .Parse ()
77-
78- return cfg
79- }
80-
81- func getRequiredEnvVars () (* config , error ) {
82- cfg := & config {}
83-
84- requiredVars := map [string ]* string {
85- "MAINTENANCE_NAMESPACE" : & cfg .namespace ,
86- "MAINTENANCE_VERSION" : & cfg .version ,
87- "MAINTENANCE_API_GROUP" : & cfg .apiGroup ,
88- "TEMPLATE_MOUNT_PATH" : & cfg .templateMountPath ,
89- "TEMPLATE_FILE_NAME" : & cfg .templateFileName ,
90- }
91-
92- for envVar , ptr := range requiredVars {
93- * ptr = os .Getenv (envVar )
94- if * ptr == "" {
95- return nil , fmt .Errorf ("%s is not provided" , envVar )
96- }
97- }
98-
99- // Feature flag: default disabled; only "true" enables it
100- if v := os .Getenv ("ENABLE_LOG_COLLECTOR" ); v == "true" {
101- cfg .enableLogCollector = true
102- }
103-
104- slog .Info ("Configuration loaded" ,
105- "namespace" , cfg .namespace ,
106- "version" , cfg .version ,
107- "apiGroup" , cfg .apiGroup ,
108- "templateMountPath" , cfg .templateMountPath ,
109- "templateFileName" , cfg .templateFileName )
110-
111- return cfg , nil
112- }
113-
114- func getMongoDBConfig (mongoClientCertMountPath string ) (* storewatcher.MongoDBConfig , error ) {
115- requiredEnvVars := map [string ]string {
116- "MONGODB_URI" : "MongoDB URI" ,
117- "MONGODB_DATABASE_NAME" : "MongoDB Database name" ,
118- "MONGODB_COLLECTION_NAME" : "MongoDB collection name" ,
119- "MONGODB_TOKEN_COLLECTION_NAME" : "MongoDB token collection name" ,
120- }
121-
122- envVars := make (map [string ]string )
123-
124- for envVar , description := range requiredEnvVars {
125- value := os .Getenv (envVar )
126- if value == "" {
127- return nil , fmt .Errorf ("%s is not provided" , description )
128- }
129-
130- envVars [envVar ] = value
131- }
132-
133- totalTimeoutSeconds , err := getEnvAsInt ("MONGODB_PING_TIMEOUT_TOTAL_SECONDS" , 300 )
134- if err != nil {
135- return nil , fmt .Errorf ("invalid MONGODB_PING_TIMEOUT_TOTAL_SECONDS: %w" , err )
136- }
137-
138- intervalSeconds , err := getEnvAsInt ("MONGODB_PING_INTERVAL_SECONDS" , 5 )
139- if err != nil {
140- return nil , fmt .Errorf ("invalid MONGODB_PING_INTERVAL_SECONDS: %w" , err )
141- }
142-
143- totalCACertTimeoutSeconds , err := getEnvAsInt ("CA_CERT_MOUNT_TIMEOUT_TOTAL_SECONDS" , 360 )
144- if err != nil {
145- return nil , fmt .Errorf ("invalid CA_CERT_MOUNT_TIMEOUT_TOTAL_SECONDS: %w" , err )
146- }
147-
148- intervalCACertSeconds , err := getEnvAsInt ("CA_CERT_READ_INTERVAL_SECONDS" , 5 )
149- if err != nil {
150- return nil , fmt .Errorf ("invalid CA_CERT_READ_INTERVAL_SECONDS: %w" , err )
151- }
152-
153- return & storewatcher.MongoDBConfig {
154- URI : envVars ["MONGODB_URI" ],
155- Database : envVars ["MONGODB_DATABASE_NAME" ],
156- Collection : envVars ["MONGODB_COLLECTION_NAME" ],
157- ClientTLSCertConfig : storewatcher.MongoDBClientTLSCertConfig {
158- TlsCertPath : filepath .Join (mongoClientCertMountPath , "tls.crt" ),
159- TlsKeyPath : filepath .Join (mongoClientCertMountPath , "tls.key" ),
160- CaCertPath : filepath .Join (mongoClientCertMountPath , "ca.crt" ),
161- },
162- TotalPingTimeoutSeconds : totalTimeoutSeconds ,
163- TotalPingIntervalSeconds : intervalSeconds ,
164- TotalCACertTimeoutSeconds : totalCACertTimeoutSeconds ,
165- TotalCACertIntervalSeconds : intervalCACertSeconds ,
166- }, nil
167- }
168-
169- func getTokenConfig () (* storewatcher.TokenConfig , error ) {
170- tokenDatabase := os .Getenv ("MONGODB_DATABASE_NAME" )
171- if tokenDatabase == "" {
172- return nil , fmt .Errorf ("MongoDB token database name is not provided" )
173- }
174-
175- tokenCollection := os .Getenv ("MONGODB_TOKEN_COLLECTION_NAME" )
176- if tokenCollection == "" {
177- return nil , fmt .Errorf ("MongoDB token collection name is not provided" )
178- }
179-
180- return & storewatcher.TokenConfig {
181- ClientName : "fault-remediation-module" ,
182- TokenDatabase : tokenDatabase ,
183- TokenCollection : tokenCollection ,
184- }, nil
185- }
41+ func main () {
42+ logger .SetDefaultStructuredLogger ("fault-remediation-module" , version )
43+ slog .Info ("Starting fault-remediation-module" , "version" , version , "commit" , commit , "date" , date )
18644
187- func getMongoPipeline () mongo.Pipeline {
188- return mongo.Pipeline {
189- bson.D {
190- bson.E {Key : "$match" , Value : bson.D {
191- bson.E {Key : "operationType" , Value : "update" },
192- bson.E {Key : "$or" , Value : bson.A {
193- // Watch for quarantine events (for remediation)
194- bson.D {
195- bson.E {Key : "fullDocument.healtheventstatus.userpodsevictionstatus.status" , Value : bson.D {
196- bson.E {Key : "$in" , Value : bson.A {model .StatusSucceeded , model .AlreadyDrained }},
197- }},
198- bson.E {Key : "fullDocument.healtheventstatus.nodequarantined" , Value : bson.D {
199- bson.E {Key : "$in" , Value : bson.A {model .Quarantined , model .AlreadyQuarantined }},
200- }},
201- },
202- // Watch for unquarantine events (for annotation cleanup)
203- bson.D {
204- bson.E {Key : "fullDocument.healtheventstatus.nodequarantined" , Value : model .UnQuarantined },
205- bson.E {Key : "fullDocument.healtheventstatus.userpodsevictionstatus.status" , Value : model .StatusSucceeded },
206- },
207- }},
208- }},
209- },
45+ if err := run (); err != nil {
46+ slog .Error ("Application encountered a fatal error" , "error" , err )
47+ os .Exit (1 )
21048 }
21149}
21250
21351func run () error {
214- // Create a context that listens for OS interrupt signals (SIGINT, SIGTERM).
215- // This enables proper graceful shutdown in Kubernetes environments
52+ metricsPort , mongoClientCertMountPath , kubeconfigPath , dryRun ,
53+ updateMaxRetries , updateRetryDelaySeconds := parseFlags ()
54+
21655 ctx , stop := signal .NotifyContext (context .Background (), syscall .SIGINT , syscall .SIGTERM )
21756 defer stop ()
21857
219- // Parse flags and get configuration
220- cfg := parseFlags ()
221-
222- // Get required environment variables
223- envCfg , err := getRequiredEnvVars ()
224- if err != nil {
225- return fmt .Errorf ("failed to get required environment variables: %w" , err )
226- }
227-
228- // Get MongoDB configuration
229- mongoConfig , err := getMongoDBConfig (cfg .mongoClientCertMountPath )
230- if err != nil {
231- return fmt .Errorf ("failed to get MongoDB configuration: %w" , err )
232- }
233-
234- // Get token configuration
235- tokenConfig , err := getTokenConfig ()
236- if err != nil {
237- return fmt .Errorf ("failed to get token configuration: %w" , err )
238- }
239-
240- // Get MongoDB pipeline
241- pipeline := getMongoPipeline ()
242-
243- // Initialize k8s client
244- k8sClient , clientSet , err := reconciler .NewK8sClient (cfg .kubeconfigPath , cfg .dryRun , reconciler.TemplateData {
245- Namespace : envCfg .namespace ,
246- Version : envCfg .version ,
247- ApiGroup : envCfg .apiGroup ,
248- TemplateMountPath : envCfg .templateMountPath ,
249- TemplateFileName : envCfg .templateFileName ,
250- })
251- if err != nil {
252- return fmt .Errorf ("error while initializing kubernetes client: %w" , err )
253- }
254-
255- slog .Info ("Successfully initialized k8sclient" )
256-
257- // Initialize and start reconciler
258- reconcilerCfg := reconciler.ReconcilerConfig {
259- MongoConfig : * mongoConfig ,
260- TokenConfig : * tokenConfig ,
261- MongoPipeline : pipeline ,
262- RemediationClient : k8sClient ,
263- StateManager : statemanager .NewStateManager (clientSet ),
264- EnableLogCollector : envCfg .enableLogCollector ,
265- UpdateMaxRetries : cfg .updateMaxRetries ,
266- UpdateRetryDelay : time .Duration (cfg .updateRetryDelaySeconds ) * time .Second ,
267- }
268-
269- reconciler := reconciler .NewReconciler (reconcilerCfg , cfg .dryRun )
270-
271- // Parse the metrics port
272- portInt , err := strconv .Atoi (cfg .metricsPort )
58+ portInt , err := strconv .Atoi (* metricsPort )
27359 if err != nil {
27460 return fmt .Errorf ("invalid metrics port: %w" , err )
27561 }
27662
277- // Create the server
27863 srv := server .NewServer (
27964 server .WithPort (portInt ),
28065 server .WithPrometheusMetrics (),
28166 server .WithSimpleHealth (),
28267 )
28368
284- // Start server and reconciler concurrently
69+ params := initializer.InitializationParams {
70+ MongoClientCertMountPath : * mongoClientCertMountPath ,
71+ KubeconfigPath : * kubeconfigPath ,
72+ DryRun : * dryRun ,
73+ UpdateMaxRetries : * updateMaxRetries ,
74+ UpdateRetryDelay : time .Duration (* updateRetryDelaySeconds ) * time .Second ,
75+ }
76+
77+ components , err := initializer .InitializeAll (ctx , params )
78+ if err != nil {
79+ return fmt .Errorf ("initialization failed: %w" , err )
80+ }
81+
28582 g , gCtx := errgroup .WithContext (ctx )
28683
287- // Start the metrics/health server.
288- // Metrics server failures are logged but do NOT terminate the service.
28984 g .Go (func () error {
29085 slog .Info ("Starting metrics server" , "port" , portInt )
29186
@@ -297,37 +92,30 @@ func run() error {
29792 })
29893
29994 g .Go (func () error {
300- return reconciler .Start (gCtx )
95+ return components . Reconciler .Start (gCtx )
30196 })
30297
303- // Wait for both goroutines to finish
30498 return g .Wait ()
30599}
306100
307- func main () {
308- logger . SetDefaultStructuredLogger ( "fault-remediation-module" , version )
309- slog . Info ( "Starting fault-remediation-module " , "version " , version , "commit" , commit , "date" , date )
101+ func parseFlags () ( metricsPort , mongoClientCertMountPath , kubeconfigPath * string , dryRun * bool ,
102+ updateMaxRetries , updateRetryDelaySeconds * int ) {
103+ metricsPort = flag . String ( "metrics-port " , "2112 " , "port to expose Prometheus metrics on" )
310104
311- if err := run (); err != nil {
312- slog .Error ("Fatal error" , "error" , err )
313- os .Exit (1 )
314- }
315- }
105+ mongoClientCertMountPath = flag .String ("mongo-client-cert-mount-path" , "/etc/ssl/mongo-client" ,
106+ "path where the mongodb client cert is mounted" )
316107
317- func getEnvAsInt (name string , defaultValue int ) (int , error ) {
318- valueStr , exists := os .LookupEnv (name )
319- if ! exists {
320- return defaultValue , nil
321- }
108+ kubeconfigPath = flag .String ("kubeconfig-path" , "" , "path to kubeconfig file" )
322109
323- value , err := strconv .Atoi (valueStr )
324- if err != nil {
325- return 0 , fmt .Errorf ("error converting %s to integer: %w" , name , err )
326- }
110+ dryRun = flag .Bool ("dry-run" , false , "flag to run fault remediation module in dry-run mode" )
327111
328- if value <= 0 {
329- return 0 , fmt .Errorf ("value of %s must be a positive integer" , name )
330- }
112+ updateMaxRetries = flag .Int ("update-max-retries" , 5 ,
113+ "maximum attempts to update remediation status per event" )
114+
115+ updateRetryDelaySeconds = flag .Int ("update-retry-delay-seconds" , 10 ,
116+ "delay in seconds between remediation status update retries" )
117+
118+ flag .Parse ()
331119
332- return value , nil
120+ return
333121}
0 commit comments