Skip to content

Commit d41ed17

Browse files
authored
Pre creating binding request, delete any pending status updates for the pod (#186)
1 parent f17a4a3 commit d41ed17

File tree

5 files changed

+24
-6
lines changed

5 files changed

+24
-6
lines changed

CHANGELOG.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,10 @@ All notable changes to this project will be documented in this file.
44

55
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).
66

7-
## [Unreleased]
7+
## [v0.4.9]
8+
9+
### Fixed
10+
- Fixed pod status scheduled race condition between the scheduler and the pod binding
811

912
## [v0.4.8]
1013

pkg/scheduler/cache/cache.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,7 @@ func (sc *SchedulerCache) WaitForWorkers(stopCh <-chan struct{}) {
223223
func (sc *SchedulerCache) Bind(taskInfo *pod_info.PodInfo, hostname string) error {
224224
startTime := time.Now()
225225
defer metrics.UpdateTaskBindDuration(startTime)
226+
sc.StatusUpdater.PreBind(taskInfo.Pod)
226227

227228
log.InfraLogger.V(3).Infof(
228229
"Creating bind request for task <%v/%v> to node <%v> gpuGroup: <%v>, requires: <%v> GPUs",

pkg/scheduler/cache/status_updater/concurrency.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ func (su *defaultStatusUpdater) Run(stopCh <-chan struct{}) {
2828
func (su *defaultStatusUpdater) SyncPodGroupsWithPendingUpdates(podGroups []*enginev2alpha2.PodGroup) {
2929
usedKeys := make(map[updatePayloadKey]bool, len(podGroups))
3030
for i := range podGroups {
31-
key := su.keyForPayload(podGroups[i].Name, podGroups[i].Namespace, podGroups[i].UID)
31+
key := su.keyForPodGroupPayload(podGroups[i].Name, podGroups[i].Namespace, podGroups[i].UID)
3232
usedKeys[key] = true
3333
inflightUpdateAny, found := su.inFlightPodGroups.Load(key)
3434
if !found {
@@ -77,10 +77,18 @@ func (su *defaultStatusUpdater) syncPodGroup(inFlightPodGroup, snapshotPodGroup
7777
return staleTimeStampUpdated && updatedSchedulingCondition
7878
}
7979

80-
func (su *defaultStatusUpdater) keyForPayload(name, namespace string, uid types.UID) updatePayloadKey {
80+
func (su *defaultStatusUpdater) keyForPodGroupPayload(name, namespace string, uid types.UID) updatePayloadKey {
8181
return updatePayloadKey(types.NamespacedName{Name: name, Namespace: namespace}.String() + "_" + string(uid))
8282
}
8383

84+
func (su *defaultStatusUpdater) keyForPodStatusPayload(name, namespace string, uid types.UID) updatePayloadKey {
85+
return updatePayloadKey(types.NamespacedName{Name: name, Namespace: namespace}.String() + "_" + string(uid) + "-Status")
86+
}
87+
88+
func (su *defaultStatusUpdater) keyForPodLabelsPayload(name, namespace string, uid types.UID) updatePayloadKey {
89+
return updatePayloadKey(types.NamespacedName{Name: name, Namespace: namespace}.String() + "_" + string(uid) + "-Labels")
90+
}
91+
8492
func (su *defaultStatusUpdater) processPayload(ctx context.Context, payload *updatePayload) {
8593
updateData, found := su.loadInflighUpdate(payload)
8694
if !found {

pkg/scheduler/cache/status_updater/default_status_updater.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,11 @@ func (su *defaultStatusUpdater) Bound(
142142
return bindError
143143
}
144144

145+
func (su *defaultStatusUpdater) PreBind(pod *v1.Pod) {
146+
// Delete any pending status updates for this pod - after this binding, they will become no longer relevant
147+
su.inFlightPods.Delete(su.keyForPodStatusPayload(pod.Name, pod.Namespace, pod.UID))
148+
}
149+
145150
func (su *defaultStatusUpdater) Pipelined(pod *v1.Pod, message string) {
146151
su.recorder.Eventf(pod, v1.EventTypeNormal, "Pipelined", message)
147152
}
@@ -163,7 +168,7 @@ func (su *defaultStatusUpdater) PatchPodLabels(pod *v1.Pod, labels map[string]an
163168

164169
su.pushToUpdateQueue(
165170
&updatePayload{
166-
key: su.keyForPayload(pod.Name, pod.Namespace, pod.UID) + "-Labels",
171+
key: su.keyForPodLabelsPayload(pod.Name, pod.Namespace, pod.UID),
167172
objectType: podType,
168173
},
169174
&inflightUpdate{
@@ -201,7 +206,7 @@ func (su *defaultStatusUpdater) RecordJobStatusEvent(job *podgroup_info.PodGroup
201206
if len(patchData) > 0 || updatePodgroupStatus {
202207
su.pushToUpdateQueue(
203208
&updatePayload{
204-
key: su.keyForPayload(job.PodGroup.Name, job.PodGroup.Namespace, job.PodGroup.UID),
209+
key: su.keyForPodGroupPayload(job.PodGroup.Name, job.PodGroup.Namespace, job.PodGroup.UID),
205210
objectType: podGroupType,
206211
},
207212
&inflightUpdate{
@@ -278,7 +283,7 @@ func (su *defaultStatusUpdater) updatePodCondition(pod *v1.Pod, condition *v1.Po
278283

279284
su.pushToUpdateQueue(
280285
&updatePayload{
281-
key: su.keyForPayload(pod.Name, pod.Namespace, pod.UID) + "-Status",
286+
key: su.keyForPodStatusPayload(pod.Name, pod.Namespace, pod.UID),
282287
objectType: podType,
283288
},
284289
&inflightUpdate{

pkg/scheduler/cache/status_updater/interface.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ type PodGroupsSync interface {
1818
type Interface interface {
1919
PodGroupsSync
2020
Evicted(evictedPodGroup *enginev2alpha2.PodGroup, evictionMetadata eviction_info.EvictionMetadata, message string)
21+
PreBind(pod *v1.Pod)
2122
Bound(pod *v1.Pod, hostname string, bindError error, nodePoolName string) error
2223
Pipelined(pod *v1.Pod, message string)
2324
PatchPodLabels(pod *v1.Pod, labels map[string]interface{})

0 commit comments

Comments
 (0)