Skip to content

Commit 89279c0

Browse files
authored
fix(scheduler): podGroup status update loop on conflict (#676)
1 parent 10cb53d commit 89279c0

File tree

2 files changed

+102
-0
lines changed

2 files changed

+102
-0
lines changed

pkg/scheduler/cache/status_updater/concurrency.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"context"
88

99
v1 "k8s.io/api/core/v1"
10+
apierrors "k8s.io/apimachinery/pkg/api/errors"
1011
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1112
"k8s.io/apimachinery/pkg/runtime"
1213
"k8s.io/apimachinery/pkg/types"
@@ -102,9 +103,16 @@ func (su *defaultStatusUpdater) updatePodGroup(
102103
}
103104

104105
if statusErr != nil || patchErr != nil {
106+
105107
if statusErr != nil {
106108
log.StatusUpdaterLogger.V(1).Errorf("Failed to update pod group status %s/%s: %v",
107109
podGroup.Namespace, podGroup.Name, statusErr)
110+
if apierrors.IsConflict(statusErr) {
111+
// Don't retry this update if the resource version is outdated - The status update cannot be updated with the given object.
112+
// If a pod group status update is required (e.g. a scheduling condition) a new status update with an updated object
113+
// will be enqueued in the next scheduling cycle.
114+
return
115+
}
108116
}
109117
if patchErr != nil {
110118
log.StatusUpdaterLogger.V(1).Errorf("Failed to patch pod group %s/%s: %v",

pkg/scheduler/cache/status_updater/concurrency_test.go

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,22 +4,29 @@
44
package status_updater
55

66
import (
7+
"context"
8+
"errors"
79
"strconv"
810
"sync"
911
"testing"
12+
"time"
1013

1114
. "github.com/onsi/ginkgo/v2"
1215
. "github.com/onsi/gomega"
1316

1417
v1 "k8s.io/api/core/v1"
18+
apierrors "k8s.io/apimachinery/pkg/api/errors"
1519
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1620
"k8s.io/apimachinery/pkg/runtime"
21+
"k8s.io/apimachinery/pkg/runtime/schema"
1722
"k8s.io/client-go/kubernetes/fake"
1823
fakecorev1 "k8s.io/client-go/kubernetes/typed/core/v1/fake"
1924
faketesting "k8s.io/client-go/testing"
2025
"k8s.io/client-go/tools/record"
2126

2227
kubeaischedfake "github.com/NVIDIA/KAI-scheduler/pkg/apis/client/clientset/versioned/fake"
28+
fakeschedulingv2alpha2 "github.com/NVIDIA/KAI-scheduler/pkg/apis/client/clientset/versioned/typed/scheduling/v2alpha2/fake"
29+
enginev2alpha2 "github.com/NVIDIA/KAI-scheduler/pkg/apis/scheduling/v2alpha2"
2330
)
2431

2532
const (
@@ -76,4 +83,91 @@ var _ = Describe("Status Updater Concurrency - large scale: increase queue size"
7683
close(signalCh)
7784
wg.Wait()
7885
})
86+
87+
It("updatePodGroup - No retry after conflict error", func() {
88+
updateStatusCalls := 0
89+
patchCalls := 0
90+
91+
// Set up reactor to return conflict error on UpdateStatus calls
92+
kubeAiSchedClient.SchedulingV2alpha2().(*fakeschedulingv2alpha2.FakeSchedulingV2alpha2).PrependReactor(
93+
"update", "podgroups", func(action faketesting.Action) (handled bool, ret runtime.Object, err error) {
94+
if updateAction, ok := action.(faketesting.UpdateAction); ok {
95+
if updateAction.GetSubresource() == "status" {
96+
updateStatusCalls++
97+
// Return a conflict error to simulate resource version mismatch
98+
return true, nil, apierrors.NewConflict(
99+
schema.GroupResource{Group: "scheduling.run.ai", Resource: "podgroups"},
100+
"test-pg",
101+
errors.New("the object has been modified; please apply your changes to the latest version"),
102+
)
103+
}
104+
}
105+
return false, nil, nil
106+
},
107+
)
108+
109+
// Track patch calls separately
110+
kubeAiSchedClient.SchedulingV2alpha2().(*fakeschedulingv2alpha2.FakeSchedulingV2alpha2).PrependReactor(
111+
"patch", "podgroups", func(action faketesting.Action) (handled bool, ret runtime.Object, err error) {
112+
patchCalls++
113+
return false, nil, nil
114+
},
115+
)
116+
117+
job := &enginev2alpha2.PodGroup{
118+
ObjectMeta: metav1.ObjectMeta{
119+
Name: "test-pg",
120+
Namespace: "test-ns",
121+
UID: "test-uid",
122+
},
123+
Status: enginev2alpha2.PodGroupStatus{
124+
SchedulingConditions: []enginev2alpha2.SchedulingCondition{
125+
{
126+
TransitionID: "1",
127+
Type: enginev2alpha2.UnschedulableOnNodePool,
128+
NodePool: "test",
129+
Reason: "test",
130+
Message: "test",
131+
Status: v1.ConditionTrue,
132+
},
133+
},
134+
},
135+
}
136+
137+
key := statusUpdater.keyForPodGroupPayload(job.Name, job.Namespace, job.UID)
138+
updateData := &inflightUpdate{
139+
object: job,
140+
patchData: nil, // No patch data, only status update
141+
updateStatus: true,
142+
subResources: nil,
143+
}
144+
145+
// Store the inflight update
146+
statusUpdater.inFlightPodGroups.Store(key, updateData)
147+
148+
statusUpdater.Run(make(chan struct{}))
149+
150+
// Call updatePodGroup directly
151+
ctx := context.Background()
152+
statusUpdater.updatePodGroup(ctx, key, updateData)
153+
154+
// Verify UpdateStatus was called once
155+
Expect(updateStatusCalls).To(Equal(1), "UpdateStatus should be called once")
156+
157+
// Verify Patch was not called (no patchData provided)
158+
Expect(patchCalls).To(Equal(0), "Patch should not be called when no patchData is provided")
159+
160+
// Verify it's not in the applied cache (since the update failed with conflict)
161+
_, appliedExists := statusUpdater.appliedPodGroupUpdates.Load(key)
162+
Expect(appliedExists).To(BeFalse(), "Update should not be in applied cache after conflict error")
163+
164+
// The key behavior: Verify the queue is empty (no retry was queued)
165+
// When a conflict error occurs, the function returns early without calling pushToUpdateQueue
166+
select {
167+
case <-statusUpdater.updateQueueOut:
168+
Fail("Update queue should be empty - no retry should be queued for conflict errors")
169+
case <-time.After(100 * time.Millisecond):
170+
// Expected - queue is empty, meaning no retry was scheduled
171+
}
172+
})
79173
})

0 commit comments

Comments
 (0)