Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 5 additions & 89 deletions go.sum

Large diffs are not rendered by default.

68 changes: 68 additions & 0 deletions pkg/device/pod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,3 +308,71 @@ func TestAddPod(t *testing.T) {
})
}
}

func TestUpdatePod(t *testing.T) {
podManager := NewPodManager()

originalPod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "pod1",
UID: k8stypes.UID("uid1"),
},
Status: corev1.PodStatus{Phase: corev1.PodRunning},
}
podManager.pods["uid1"] = &PodInfo{
Pod: originalPod,
NodeID: "node1",
Devices: PodDevices{"device1": {{}}},
}

for _, ts := range []struct {
name string
updatedPod *corev1.Pod
expectInCache bool
expectNodeID string
expectDevices PodDevices
expectDelTimestamp bool
}{
{
name: "update terminating pod preserves NodeID and Devices",
updatedPod: &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "pod1",
UID: k8stypes.UID("uid1"),
DeletionTimestamp: func() *metav1.Time { t := metav1.Now(); return &t }(),
},
Status: corev1.PodStatus{Phase: corev1.PodRunning},
},
expectInCache: true,
expectNodeID: "node1",
expectDevices: PodDevices{"device1": {{}}},
expectDelTimestamp: true,
},
{
name: "update non-existent pod is a no-op",
updatedPod: &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "ghost-pod",
UID: k8stypes.UID("uid-ghost"),
},
},
expectInCache: false,
},
} {
t.Run(ts.name, func(t *testing.T) {
podManager.UpdatePod(ts.updatedPod)

pi, ok := podManager.pods[ts.updatedPod.UID]
assert.Equal(t, ts.expectInCache, ok)

if ts.expectInCache {
assert.Equal(t, ts.expectNodeID, pi.NodeID, "NodeID must be preserved")
assert.Equal(t, ts.expectDevices, pi.Devices, "Devices must be preserved")
assert.Equal(t, ts.expectDelTimestamp, pi.DeletionTimestamp != nil, "DeletionTimestamp should be updated")
}
})
}
}
13 changes: 13 additions & 0 deletions pkg/device/pods.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,19 @@ func (m *PodManager) AddPod(pod *corev1.Pod, nodeID string, devices PodDevices)
return !exists
}

func (m *PodManager) UpdatePod(pod *corev1.Pod) {
m.mutex.Lock()
defer m.mutex.Unlock()

if pi, exists := m.pods[pod.UID]; exists {
pi.Pod = pod
klog.V(5).InfoS("Pod object updated in cache (terminating state)",
"pod", klog.KRef(pod.Namespace, pod.Name),
"deletionTimestamp", pod.DeletionTimestamp,
)
}
}

func (m *PodManager) DelPod(pod *corev1.Pod) {
m.mutex.Lock()
defer m.mutex.Unlock()
Expand Down
5 changes: 5 additions & 0 deletions pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,11 @@ func (s *Scheduler) onAddPod(obj any) {
s.podManager.DelPod(pod)
return
}
if util.IsPodTerminating(pod) {
klog.V(5).InfoS("Pod is terminating but holding locks, preserving cache", "pod", pod.Name)
s.podManager.UpdatePod(pod)
return
}
podDev, _ := device.DecodePodDevices(device.SupportDevices, pod.Annotations)
if s.podManager.AddPod(pod, nodeID, podDev) {
s.quotaManager.AddUsage(pod, podDev)
Expand Down
49 changes: 49 additions & 0 deletions pkg/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package scheduler
import (
"context"
"fmt"
"maps"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -1265,3 +1266,51 @@ func Test_ListNodes_Concurrent(t *testing.T) {
close(stopCh)
<-done
}

func Test_Scheduler_Issue1368_TerminatingPodRetainsCache(t *testing.T) {
s := NewScheduler()

podDevces := device.PodDevices{
nvidia.NvidiaGPUDevice: device.PodSingleDevice{
[]device.ContainerDevice{
{Idx: 0, UUID: "GPU0", Usedmem: 1000, Usedcores: 10},
},
},
}

basePod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
UID: "1368-test-uid",
Name: "test-terminating-pod",
Namespace: "default",
Annotations: map[string]string{
util.AssignedNodeAnnotations: "node1",
},
},
Status: corev1.PodStatus{Phase: corev1.PodRunning},
}

encodedAnnotations := device.EncodePodDevices(device.SupportDevices, podDevces)
maps.Copy(basePod.Annotations, encodedAnnotations)
s.onAddPod(basePod)

_, ok := s.podManager.GetPod(basePod)
assert.Equal(t, true, ok, "Pod should be in cache after initial addition")

terminatingPod := basePod.DeepCopy()
now := metav1.Now()
terminatingPod.DeletionTimestamp = &now

s.onAddPod(terminatingPod)

_, ok = s.podManager.GetPod(terminatingPod)
assert.Equal(t, true, ok, "BUGFIX #1368: Pod should STILL be in cache while terminating (DeletionTimestamp != nil)")

terminatedPod := terminatingPod.DeepCopy()
terminatedPod.Status.Phase = corev1.PodSucceeded

s.onAddPod(terminatedPod)

_, ok = s.podManager.GetPod(terminatedPod)
assert.Equal(t, false, ok, "Pod should be removed from cache after reaching a terminal phase (Succeeded/Failed)")
}
4 changes: 4 additions & 0 deletions pkg/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,10 @@ func IsPodInTerminatedState(pod *corev1.Pod) bool {
return pod.Status.Phase == corev1.PodFailed || pod.Status.Phase == corev1.PodSucceeded
}

func IsPodTerminating(pod *corev1.Pod) bool {
return pod.DeletionTimestamp != nil
}

func AllContainersCreated(pod *corev1.Pod) bool {
return len(pod.Status.ContainerStatuses) >= len(pod.Spec.Containers)
}
33 changes: 33 additions & 0 deletions pkg/util/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -609,3 +609,36 @@ func TestPatchPodLabels(t *testing.T) {
})
}
}

func Test_IsPodTerminating(t *testing.T) {
now := metav1.Now()
tests := []struct {
name string
args *corev1.Pod
want bool
}{
{
name: "pod with deletion timestamp (terminating)",
args: &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
DeletionTimestamp: &now,
},
},
want: true,
},
{
name: "pod without deletion timestamp (normal)",
args: &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{},
},
want: false,
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
got := IsPodTerminating(test.args)
assert.Equal(t, test.want, got)
})
}
}
Loading