@@ -5,17 +5,14 @@ package status_updater
55
66import (
77 "context"
8- "strconv"
98
109 v1 "k8s.io/api/core/v1"
1110 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1211 "k8s.io/apimachinery/pkg/runtime"
1312 "k8s.io/apimachinery/pkg/types"
1413
1514 enginev2alpha2 "github.com/NVIDIA/KAI-scheduler/pkg/apis/scheduling/v2alpha2"
16- commonconstants "github.com/NVIDIA/KAI-scheduler/pkg/common/constants"
1715 "github.com/NVIDIA/KAI-scheduler/pkg/scheduler/log"
18- "github.com/NVIDIA/KAI-scheduler/pkg/scheduler/utils"
1916)
2017
2118func (su * defaultStatusUpdater ) Run (stopCh <- chan struct {}) {
@@ -25,67 +22,6 @@ func (su *defaultStatusUpdater) Run(stopCh <-chan struct{}) {
2522 go su .queueBufferWorker (stopCh )
2623}
2724
28- func (su * defaultStatusUpdater ) SyncPodGroupsWithPendingUpdates (podGroups []* enginev2alpha2.PodGroup ) {
29- usedKeys := make (map [updatePayloadKey ]bool , len (podGroups ))
30- for i := range podGroups {
31- key := su .keyForPodGroupPayload (podGroups [i ].Name , podGroups [i ].Namespace , podGroups [i ].UID )
32- usedKeys [key ] = true
33- inflightUpdateAny , found := su .inFlightPodGroups .Load (key )
34- if ! found {
35- continue
36- }
37- podGroup := inflightUpdateAny .(* inflightUpdate ).object .(* enginev2alpha2.PodGroup )
38- isPodGroupUpdated := su .syncPodGroup (podGroup , podGroups [i ])
39- if isPodGroupUpdated {
40- su .inFlightPodGroups .Delete (key )
41- }
42- }
43-
44- // Cleanup podGroups that don't comeup anymore
45- su .inFlightPodGroups .Range (func (key any , _ any ) bool {
46- if _ , found := usedKeys [key .(updatePayloadKey )]; ! found {
47- su .inFlightPodGroups .Delete (key )
48- }
49- return true
50- })
51- }
52-
53- func (su * defaultStatusUpdater ) syncPodGroup (inFlightPodGroup , snapshotPodGroup * enginev2alpha2.PodGroup ) bool {
54- staleTimeStampUpdated := false
55- if snapshotPodGroup .Annotations [commonconstants .StalePodgroupTimeStamp ] == inFlightPodGroup .Annotations [commonconstants .StalePodgroupTimeStamp ] {
56- staleTimeStampUpdated = true
57- } else {
58- if snapshotPodGroup .Annotations == nil {
59- snapshotPodGroup .Annotations = make (map [string ]string )
60- }
61- snapshotPodGroup .Annotations [commonconstants .StalePodgroupTimeStamp ] = inFlightPodGroup .Annotations [commonconstants .StalePodgroupTimeStamp ]
62- }
63- lastStartTimestampUpdated := false
64- if snapshotPodGroup .Annotations [commonconstants .LastStartTimeStamp ] == inFlightPodGroup .Annotations [commonconstants .LastStartTimeStamp ] {
65- lastStartTimestampUpdated = true
66- } else {
67- if snapshotPodGroup .Annotations == nil {
68- snapshotPodGroup .Annotations = make (map [string ]string )
69- }
70- snapshotPodGroup .Annotations [commonconstants .LastStartTimeStamp ] = inFlightPodGroup .Annotations [commonconstants .LastStartTimeStamp ]
71- }
72-
73- updatedSchedulingCondition := false
74- lastSchedulingCondition := utils .GetLastSchedulingCondition (inFlightPodGroup )
75- currentLastSchedulingCondition := utils .GetLastSchedulingCondition (snapshotPodGroup )
76- if currentLastSchedulingCondition != nil && lastSchedulingCondition != nil {
77- currentID , currentErr := strconv .Atoi (currentLastSchedulingCondition .TransitionID )
78- lastID , lastErr := strconv .Atoi (lastSchedulingCondition .TransitionID )
79- if currentErr == nil && lastErr == nil && (lastID <= currentID || currentID == 0 ) {
80- updatedSchedulingCondition = true
81- }
82- }
83- if ! updatedSchedulingCondition {
84- snapshotPodGroup .Status .SchedulingConditions = inFlightPodGroup .Status .SchedulingConditions
85- }
86- return lastStartTimestampUpdated && staleTimeStampUpdated && updatedSchedulingCondition
87- }
88-
8925func (su * defaultStatusUpdater ) keyForPodGroupPayload (name , namespace string , uid types.UID ) updatePayloadKey {
9026 return updatePayloadKey (types.NamespacedName {Name : name , Namespace : namespace }.String () + "_" + string (uid ))
9127}
@@ -108,7 +44,7 @@ func (su *defaultStatusUpdater) processPayload(ctx context.Context, payload *upd
10844 case podType :
10945 su .updatePod (ctx , payload .key , updateData .patchData , updateData .subResources , updateData .object )
11046 case podGroupType :
111- su .updatePodGroup (ctx , payload .key , updateData . patchData , updateData . subResources , updateData . updateStatus , updateData . object )
47+ su .updatePodGroup (ctx , payload .key , updateData )
11248 }
11349}
11450
@@ -146,23 +82,27 @@ func (su *defaultStatusUpdater) updatePod(
14682// +kubebuilder:rbac:groups="scheduling.run.ai",resources=podgroups/status,verbs=create;delete;update;patch;get;list;watch
14783
14884func (su * defaultStatusUpdater ) updatePodGroup (
149- ctx context.Context , _ updatePayloadKey , patchData [] byte , subResources [] string , updateStatus bool , object runtime. Object ,
85+ ctx context.Context , key updatePayloadKey , updateData * inflightUpdate ,
15086) {
151- podGroup := object .(* enginev2alpha2.PodGroup )
87+ podGroup := updateData . object .(* enginev2alpha2.PodGroup )
15288
15389 var err error
154- if updateStatus {
90+ if updateData . updateStatus {
15591 _ , err = su .kubeaischedClient .SchedulingV2alpha2 ().PodGroups (podGroup .Namespace ).UpdateStatus (
15692 ctx , podGroup , metav1.UpdateOptions {},
15793 )
15894 }
159- if len (patchData ) > 0 {
95+ if len (updateData . patchData ) > 0 {
16096 _ , err = su .kubeaischedClient .SchedulingV2alpha2 ().PodGroups (podGroup .Namespace ).Patch (
161- ctx , podGroup .Name , types .JSONPatchType , patchData , metav1.PatchOptions {}, subResources ... ,
97+ ctx , podGroup .Name , types .JSONPatchType , updateData . patchData , metav1.PatchOptions {}, updateData . subResources ... ,
16298 )
16399 }
164100 if err != nil {
165101 log .StatusUpdaterLogger .Errorf ("Failed to update pod group %s/%s: %v" , podGroup .Namespace , podGroup .Name , err )
102+ } else {
103+ // Move the update to the applied cache
104+ su .appliedPodGroupUpdates .Store (key , updateData )
105+ su .inFlightPodGroups .Delete (key )
166106 }
167107}
168108
0 commit comments