1717package daemon
1818
1919import (
20+ "context"
2021 "encoding/json"
2122 "fmt"
2223 "net"
@@ -31,8 +32,11 @@ import (
3132 "github.com/rs/zerolog/log"
3233 kapi "k8s.io/api/core/v1"
3334 kerrors "k8s.io/apimachinery/pkg/api/errors"
35+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3436 "k8s.io/apimachinery/pkg/types"
3537 "k8s.io/apimachinery/pkg/util/wait"
38+ "k8s.io/client-go/tools/leaderelection"
39+ "k8s.io/client-go/tools/leaderelection/resourcelock"
3640
3741 "github.com/Mellanox/ib-kubernetes/pkg/config"
3842 "github.com/Mellanox/ib-kubernetes/pkg/guid"
@@ -158,19 +162,131 @@ func NewDaemon() (Daemon, error) {
158162}
159163
160164func (d * daemon ) Run () {
165+ ctx , cancel := context .WithCancel (context .Background ())
166+ defer cancel ()
167+
161168 // setup signal handling
162169 sigChan := make (chan os.Signal , 1 )
163170 signal .Notify (sigChan , syscall .SIGINT , syscall .SIGTERM )
164171
165- // Init the guid pool
172+ // Use node name + Pod UID for stable and unique leader identity
173+ nodeName := os .Getenv ("K8S_NODE" )
174+ if nodeName == "" {
175+ log .Warn ().Msg ("K8S_NODE environment variable not set, falling back to hostname" )
176+ var err error
177+ nodeName , err = os .Hostname ()
178+ if err != nil {
179+ log .Error ().Msgf ("Failed to get hostname: %v" , err )
180+ return
181+ }
182+ }
183+
184+ podUID := os .Getenv ("POD_UID" )
185+ var identity string
186+ if podUID == "" {
187+ log .Warn ().Msg ("POD_UID environment variable not set, falling back to node name only" )
188+ identity = nodeName
189+ } else {
190+ identity = nodeName + "_" + podUID
191+ }
192+
193+ // Get the namespace where this pod is running
194+ namespace := os .Getenv ("POD_NAMESPACE" )
195+ if namespace == "" {
196+ log .Warn ().Msg ("POD_NAMESPACE environment variable not set, falling back to 'kube-system'" )
197+ namespace = "kube-system"
198+ }
199+
200+ log .Info ().Msgf ("Starting leader election in namespace: %s with identity: %s" , namespace , identity )
201+
202+ // Create leader election configuration
203+ lock := & resourcelock.LeaseLock {
204+ LeaseMeta : metav1.ObjectMeta {
205+ Name : "ib-kubernetes-leader" ,
206+ Namespace : namespace ,
207+ },
208+ Client : d .kubeClient .GetCoordinationV1 (),
209+ LockConfig : resourcelock.ResourceLockConfig {
210+ Identity : identity ,
211+ },
212+ }
213+
214+ leaderElectionConfig := leaderelection.LeaderElectionConfig {
215+ Lock : lock ,
216+ ReleaseOnCancel : true ,
217+ LeaseDuration : 60 * time .Second , // Standard Kubernetes components duration
218+ RenewDeadline : 30 * time .Second , // Standard Kubernetes components deadline
219+ RetryPeriod : 20 * time .Second , // Standard Kubernetes components retry
220+ Callbacks : leaderelection.LeaderCallbacks {
221+ OnStartedLeading : func (ctx context.Context ) {
222+ log .Info ().Msgf ("Started leading with identity: %s" , identity )
223+ if err := d .becomeLeader (); err != nil {
224+ log .Error ().Msgf ("Failed to become leader: %v" , err )
225+ // Cancel context to gracefully release lease and exit
226+ cancel ()
227+ return
228+ }
229+ },
230+ OnStoppedLeading : func () {
231+ log .Error ().Msgf ("Lost leadership unexpectedly, identity: %s" , identity )
232+ // Leadership lost unexpectedly - force immediate restart for clean state
233+ os .Exit (1 )
234+ },
235+ OnNewLeader : func (leaderIdentity string ) {
236+ if leaderIdentity == identity {
237+ log .Info ().Msgf ("We are the new leader: %s" , leaderIdentity )
238+ } else {
239+ log .Info ().Msgf ("New leader elected: %s" , leaderIdentity )
240+ }
241+ },
242+ },
243+ }
244+
245+ // Start leader election in background
246+ leaderElectionDone := make (chan struct {})
247+ go func () {
248+ defer close (leaderElectionDone )
249+ leaderelection .RunOrDie (ctx , leaderElectionConfig )
250+ }()
251+
252+ // Wait for termination signal or leader election completion
253+ select {
254+ case sig := <- sigChan :
255+ log .Info ().Msgf ("Received signal %s. Terminating..." , sig )
256+ cancel () // This triggers ReleaseOnCancel
257+ // Wait for graceful lease release
258+ select {
259+ case <- leaderElectionDone :
260+ case <- time .After (5 * time .Second ):
261+ log .Warn ().Msg ("Graceful shutdown timeout exceeded" )
262+ }
263+ case <- leaderElectionDone :
264+ log .Info ().Msg ("Leader election completed" )
265+ }
266+ }
267+
268+ // becomeLeader is called when this instance becomes the leader
269+ func (d * daemon ) becomeLeader () error {
270+ log .Info ().Msg ("Becoming leader, initializing daemon logic" )
271+
272+ // Initialize the GUID pool (rebuild state from existing pods)
166273 if err := d .initPool (); err != nil {
167- log .Error ().Msgf ("initPool(): Daemon could not init the guid pool: %v" , err )
168- os . Exit ( 1 )
274+ log .Error ().Msgf ("initPool(): Leader could not init the guid pool: %v" , err )
275+ return fmt . Errorf ( "failed to initialize GUID pool as leader: %v" , err )
169276 }
170277
171- // Run periodic tasks
172- // closing the channel will stop the goroutines executed in the wait.Until() calls below
278+ // Start the actual daemon logic
279+ d .runLeaderLogic ()
280+ return nil
281+ }
282+
283+ // runLeaderLogic runs the actual daemon operations, only called by the leader
284+ func (d * daemon ) runLeaderLogic () {
285+ log .Info ().Msg ("Starting leader daemon logic" )
286+
287+ // Run periodic tasks (only leader should do this)
173288 stopPeriodicsChan := make (chan struct {})
289+
174290 go wait .Until (d .AddPeriodicUpdate , time .Duration (d .config .PeriodicUpdate )* time .Second , stopPeriodicsChan )
175291 go wait .Until (d .DeletePeriodicUpdate , time .Duration (d .config .PeriodicUpdate )* time .Second , stopPeriodicsChan )
176292 defer close (stopPeriodicsChan )
@@ -180,6 +296,8 @@ func (d *daemon) Run() {
180296 defer watcherStopFunc ()
181297
182298 // Run until interrupted by os signals
299+ sigChan := make (chan os.Signal , 1 )
300+ signal .Notify (sigChan , syscall .SIGINT , syscall .SIGTERM )
183301 sig := <- sigChan
184302 log .Info ().Msgf ("Received signal %s. Terminating..." , sig )
185303}
0 commit comments