Skip to content

Commit 4c913b1

Browse files
authored
Fix scheduler pod group status synchronization between incoming update and in-cluster data - v0.4 cherrypick (#223)
* Keep updated pod-groups data in a separate syncmap
1 parent 7cd0616 commit 4c913b1

File tree

6 files changed

+418
-62
lines changed

6 files changed

+418
-62
lines changed

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,11 @@ All notable changes to this project will be documented in this file.
44

55
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).
66

7+
## [v0.4.10] - 2025-06-09
8+
9+
### Fixed
10+
- Fix scheduler pod group status synchronization between incoming update and in-cluster data
11+
712
## [v0.4.9] - 2025-05-27
813

914
### Fixed

pkg/scheduler/cache/status_updater/concurrency.go

Lines changed: 10 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -5,17 +5,14 @@ package status_updater
55

66
import (
77
"context"
8-
"strconv"
98

109
v1 "k8s.io/api/core/v1"
1110
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1211
"k8s.io/apimachinery/pkg/runtime"
1312
"k8s.io/apimachinery/pkg/types"
1413

1514
enginev2alpha2 "github.com/NVIDIA/KAI-scheduler/pkg/apis/scheduling/v2alpha2"
16-
commonconstants "github.com/NVIDIA/KAI-scheduler/pkg/common/constants"
1715
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/log"
18-
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/utils"
1916
)
2017

2118
func (su *defaultStatusUpdater) Run(stopCh <-chan struct{}) {
@@ -25,58 +22,6 @@ func (su *defaultStatusUpdater) Run(stopCh <-chan struct{}) {
2522
go su.queueBufferWorker(stopCh)
2623
}
2724

28-
func (su *defaultStatusUpdater) SyncPodGroupsWithPendingUpdates(podGroups []*enginev2alpha2.PodGroup) {
29-
usedKeys := make(map[updatePayloadKey]bool, len(podGroups))
30-
for i := range podGroups {
31-
key := su.keyForPodGroupPayload(podGroups[i].Name, podGroups[i].Namespace, podGroups[i].UID)
32-
usedKeys[key] = true
33-
inflightUpdateAny, found := su.inFlightPodGroups.Load(key)
34-
if !found {
35-
continue
36-
}
37-
podGroup := inflightUpdateAny.(*inflightUpdate).object.(*enginev2alpha2.PodGroup)
38-
isPodGroupUpdated := su.syncPodGroup(podGroup, podGroups[i])
39-
if isPodGroupUpdated {
40-
su.inFlightPodGroups.Delete(key)
41-
}
42-
}
43-
44-
// Cleanup podGroups that don't comeup anymore
45-
su.inFlightPodGroups.Range(func(key any, _ any) bool {
46-
if _, found := usedKeys[key.(updatePayloadKey)]; !found {
47-
su.inFlightPodGroups.Delete(key)
48-
}
49-
return true
50-
})
51-
}
52-
53-
func (su *defaultStatusUpdater) syncPodGroup(inFlightPodGroup, snapshotPodGroup *enginev2alpha2.PodGroup) bool {
54-
staleTimeStampUpdated := false
55-
if snapshotPodGroup.Annotations[commonconstants.StalePodgroupTimeStamp] == inFlightPodGroup.Annotations[commonconstants.StalePodgroupTimeStamp] {
56-
staleTimeStampUpdated = true
57-
} else {
58-
if snapshotPodGroup.Annotations == nil {
59-
snapshotPodGroup.Annotations = make(map[string]string)
60-
}
61-
snapshotPodGroup.Annotations[commonconstants.StalePodgroupTimeStamp] = inFlightPodGroup.Annotations[commonconstants.StalePodgroupTimeStamp]
62-
}
63-
64-
updatedSchedulingCondition := false
65-
lastSchedulingCondition := utils.GetLastSchedulingCondition(inFlightPodGroup)
66-
currentLastSchedulingCondition := utils.GetLastSchedulingCondition(snapshotPodGroup)
67-
if currentLastSchedulingCondition != nil && lastSchedulingCondition != nil {
68-
currentID, currentErr := strconv.Atoi(currentLastSchedulingCondition.TransitionID)
69-
lastID, lastErr := strconv.Atoi(lastSchedulingCondition.TransitionID)
70-
if currentErr == nil && lastErr == nil && (lastID <= currentID || currentID == 0) {
71-
updatedSchedulingCondition = true
72-
}
73-
}
74-
if !updatedSchedulingCondition {
75-
snapshotPodGroup.Status.SchedulingConditions = inFlightPodGroup.Status.SchedulingConditions
76-
}
77-
return staleTimeStampUpdated && updatedSchedulingCondition
78-
}
79-
8025
func (su *defaultStatusUpdater) keyForPodGroupPayload(name, namespace string, uid types.UID) updatePayloadKey {
8126
return updatePayloadKey(types.NamespacedName{Name: name, Namespace: namespace}.String() + "_" + string(uid))
8227
}
@@ -99,7 +44,7 @@ func (su *defaultStatusUpdater) processPayload(ctx context.Context, payload *upd
9944
case podType:
10045
su.updatePod(ctx, payload.key, updateData.patchData, updateData.subResources, updateData.object)
10146
case podGroupType:
102-
su.updatePodGroup(ctx, payload.key, updateData.patchData, updateData.subResources, updateData.updateStatus, updateData.object)
47+
su.updatePodGroup(ctx, payload.key, updateData)
10348
}
10449
}
10550

@@ -137,23 +82,27 @@ func (su *defaultStatusUpdater) updatePod(
13782
// +kubebuilder:rbac:groups="scheduling.run.ai",resources=podgroups/status,verbs=create;delete;update;patch;get;list;watch
13883

13984
func (su *defaultStatusUpdater) updatePodGroup(
140-
ctx context.Context, _ updatePayloadKey, patchData []byte, subResources []string, updateStatus bool, object runtime.Object,
85+
ctx context.Context, key updatePayloadKey, updateData *inflightUpdate,
14186
) {
142-
podGroup := object.(*enginev2alpha2.PodGroup)
87+
podGroup := updateData.object.(*enginev2alpha2.PodGroup)
14388

14489
var err error
145-
if updateStatus {
90+
if updateData.updateStatus {
14691
_, err = su.kubeaischedClient.SchedulingV2alpha2().PodGroups(podGroup.Namespace).UpdateStatus(
14792
ctx, podGroup, metav1.UpdateOptions{},
14893
)
14994
}
150-
if len(patchData) > 0 {
95+
if len(updateData.patchData) > 0 {
15196
_, err = su.kubeaischedClient.SchedulingV2alpha2().PodGroups(podGroup.Namespace).Patch(
152-
ctx, podGroup.Name, types.JSONPatchType, patchData, metav1.PatchOptions{}, subResources...,
97+
ctx, podGroup.Name, types.JSONPatchType, updateData.patchData, metav1.PatchOptions{}, updateData.subResources...,
15398
)
15499
}
155100
if err != nil {
156101
log.StatusUpdaterLogger.Errorf("Failed to update pod group %s/%s: %v", podGroup.Namespace, podGroup.Name, err)
102+
} else {
103+
// Move the update to the applied cache
104+
su.appliedPodGroupUpdates.Store(key, updateData)
105+
su.inFlightPodGroups.Delete(key)
157106
}
158107
}
159108

pkg/scheduler/cache/status_updater/concurrency_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ func TestConcurrency(t *testing.T) {
3737
RunSpecs(t, "Status Updater Concurrency Suite")
3838
}
3939

40-
var _ = Describe("Status Updater Concurrency", func() {
40+
var _ = Describe("Status Updater Concurrency - large scale: increase queue size", func() {
4141
var (
4242
kubeClient *fake.Clientset
4343
kubeAiSchedClient *kubeaischedfake.Clientset

pkg/scheduler/cache/status_updater/default_status_updater.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,8 @@ type defaultStatusUpdater struct {
6868

6969
inFlightPodGroups sync.Map
7070
inFlightPods sync.Map
71+
72+
appliedPodGroupUpdates sync.Map
7173
}
7274

7375
// +kubebuilder:rbac:groups="",resources=events,verbs=create;update;patch;delete;list;get;watch
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
// Copyright 2025 NVIDIA CORPORATION
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package status_updater
5+
6+
import (
7+
"strconv"
8+
9+
enginev2alpha2 "github.com/NVIDIA/KAI-scheduler/pkg/apis/scheduling/v2alpha2"
10+
commonconstants "github.com/NVIDIA/KAI-scheduler/pkg/common/constants"
11+
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/utils"
12+
)
13+
14+
type podGroupStatusSyncResult string
15+
16+
const (
17+
snapshotStatusIsOlder podGroupStatusSyncResult = "snapshotStatusIsOlder"
18+
equalStatuses podGroupStatusSyncResult = "equalStatuses"
19+
updateRequestIsOlder podGroupStatusSyncResult = "updateRequestIsOlder"
20+
)
21+
22+
func (su *defaultStatusUpdater) SyncPodGroupsWithPendingUpdates(podGroups []*enginev2alpha2.PodGroup) {
23+
usedKeys := make(map[updatePayloadKey]bool, len(podGroups))
24+
for i := range podGroups {
25+
key := su.keyForPodGroupPayload(podGroups[i].Name, podGroups[i].Namespace, podGroups[i].UID)
26+
usedKeys[key] = true
27+
pgLatestUpdate, inFlightUpdateFound, appliedUpdateFound := su.getLatestPgUpdate(key)
28+
if !inFlightUpdateFound && !appliedUpdateFound {
29+
continue
30+
}
31+
podGroup := pgLatestUpdate.object.(*enginev2alpha2.PodGroup)
32+
podGroupsyncResults := su.syncPodGroup(podGroup, podGroups[i])
33+
// Delete the inflight update if it was applied + the pod group in the lister matches the inFlight
34+
if podGroupsyncResults != snapshotStatusIsOlder && appliedUpdateFound {
35+
su.appliedPodGroupUpdates.Delete(key)
36+
} else if podGroupsyncResults == updateRequestIsOlder && inFlightUpdateFound {
37+
su.inFlightPodGroups.Delete(key)
38+
}
39+
}
40+
41+
// Cleanup podGroups that don't comeup anymore
42+
su.cleanUpdatesForNonSeenPodGroups(usedKeys)
43+
}
44+
45+
func (su *defaultStatusUpdater) getLatestPgUpdate(key updatePayloadKey) (*inflightUpdate, bool, bool) {
46+
inflightPgUpdate, inFlightUpdateFound := su.inFlightPodGroups.Load(key)
47+
appliedPgUpdate, appliedUpdateFound := su.appliedPodGroupUpdates.Load(key)
48+
var pgLatestUpdate *inflightUpdate
49+
if inFlightUpdateFound {
50+
pgLatestUpdate = inflightPgUpdate.(*inflightUpdate)
51+
} else if appliedUpdateFound {
52+
pgLatestUpdate = appliedPgUpdate.(*inflightUpdate)
53+
}
54+
return pgLatestUpdate, inFlightUpdateFound, appliedUpdateFound
55+
}
56+
57+
func (su *defaultStatusUpdater) cleanUpdatesForNonSeenPodGroups(usedKeys map[updatePayloadKey]bool) {
58+
su.inFlightPodGroups.Range(func(key any, _ any) bool {
59+
if _, found := usedKeys[key.(updatePayloadKey)]; !found {
60+
su.inFlightPodGroups.Delete(key)
61+
}
62+
return true
63+
})
64+
su.appliedPodGroupUpdates.Range(func(key any, _ any) bool {
65+
if _, found := usedKeys[key.(updatePayloadKey)]; !found {
66+
su.appliedPodGroupUpdates.Delete(key)
67+
}
68+
return true
69+
})
70+
}
71+
72+
func (su *defaultStatusUpdater) syncPodGroup(inFlightPodGroup, snapshotPodGroup *enginev2alpha2.PodGroup) podGroupStatusSyncResult {
73+
staleTimeStampUpdated := false
74+
if snapshotPodGroup.Annotations[commonconstants.StalePodgroupTimeStamp] == inFlightPodGroup.Annotations[commonconstants.StalePodgroupTimeStamp] {
75+
staleTimeStampUpdated = true
76+
} else {
77+
if snapshotPodGroup.Annotations == nil {
78+
snapshotPodGroup.Annotations = make(map[string]string)
79+
}
80+
snapshotPodGroup.Annotations[commonconstants.StalePodgroupTimeStamp] = inFlightPodGroup.Annotations[commonconstants.StalePodgroupTimeStamp]
81+
}
82+
83+
statusComparison := compareSchedulingConditions(inFlightPodGroup, snapshotPodGroup)
84+
85+
if statusComparison == equalStatuses || statusComparison == snapshotStatusIsOlder {
86+
snapshotPodGroup.Status.SchedulingConditions = inFlightPodGroup.Status.SchedulingConditions
87+
}
88+
if statusComparison == equalStatuses && !staleTimeStampUpdated {
89+
statusComparison = snapshotStatusIsOlder
90+
}
91+
92+
return statusComparison
93+
}
94+
95+
func compareSchedulingConditions(inFlightPodGroup, snapshotPodGroup *enginev2alpha2.PodGroup) podGroupStatusSyncResult {
96+
lastSchedulingCondition := utils.GetLastSchedulingCondition(inFlightPodGroup)
97+
currentLastSchedulingCondition := utils.GetLastSchedulingCondition(snapshotPodGroup)
98+
99+
if currentLastSchedulingCondition == nil && lastSchedulingCondition == nil {
100+
return equalStatuses
101+
}
102+
if currentLastSchedulingCondition == nil {
103+
return snapshotStatusIsOlder
104+
}
105+
if lastSchedulingCondition == nil {
106+
return updateRequestIsOlder
107+
}
108+
109+
currentID, currentErr := strconv.Atoi(currentLastSchedulingCondition.TransitionID)
110+
lastID, lastErr := strconv.Atoi(lastSchedulingCondition.TransitionID)
111+
112+
// If an ID is invalid, use the other condition as the "newer" one
113+
if currentErr != nil {
114+
return snapshotStatusIsOlder
115+
} else if lastErr != nil {
116+
return updateRequestIsOlder
117+
}
118+
119+
// Compare valid IDs
120+
switch {
121+
case lastID > currentID || currentID == 0:
122+
return snapshotStatusIsOlder
123+
case lastID == currentID:
124+
return equalStatuses
125+
default: // lastID < currentID
126+
return updateRequestIsOlder
127+
}
128+
}

0 commit comments

Comments
 (0)