Skip to content

Commit 36332ea

Browse files
authored
Do not create error events for successfully scheduled podGroups - v0.5 (#237)
1 parent 192f18c commit 36332ea

File tree

3 files changed

+141
-8
lines changed

3 files changed

+141
-8
lines changed

pkg/scheduler/cache/status_updater/default_status_updater.go

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -190,16 +190,19 @@ func (su *defaultStatusUpdater) RecordJobStatusEvent(job *podgroup_info.PodGroup
190190
su.recordStaleJobEvent(job)
191191
}
192192

193-
if !job.IsReadyForScheduling() {
194-
su.recordJobNotReadyEvent(job)
195-
return nil
196-
}
193+
updatePodgroupStatus := false
194+
if job.GetNumPendingTasks() > 0 || job.GetNumGatedTasks() > 0 {
195+
if !job.IsReadyForScheduling() {
196+
su.recordJobNotReadyEvent(job)
197+
return nil
198+
}
197199

198-
if err := su.recordUnschedulablePodsEvents(job); err != nil {
199-
return err
200-
}
200+
if err := su.recordUnschedulablePodsEvents(job); err != nil {
201+
return err
202+
}
201203

202-
updatePodgroupStatus := su.recordUnschedulablePodGroup(job)
204+
updatePodgroupStatus = su.recordUnschedulablePodGroup(job)
205+
}
203206

204207
if len(patchData) > 0 || updatePodgroupStatus {
205208
su.pushToUpdateQueue(

pkg/scheduler/cache/status_updater/default_status_updater_test.go

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,26 @@
44
package status_updater
55

66
import (
7+
"sync"
78
"testing"
89
"time"
910

1011
"github.com/stretchr/testify/assert"
1112
v1 "k8s.io/api/core/v1"
1213
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
14+
"k8s.io/apimachinery/pkg/runtime"
15+
"k8s.io/client-go/kubernetes/fake"
16+
faketesting "k8s.io/client-go/testing"
17+
"k8s.io/client-go/tools/record"
1318
"k8s.io/utils/ptr"
1419

20+
kubeaischedfake "github.com/NVIDIA/KAI-scheduler/pkg/apis/client/clientset/versioned/fake"
21+
fakeschedulingv2alpha2 "github.com/NVIDIA/KAI-scheduler/pkg/apis/client/clientset/versioned/typed/scheduling/v2alpha2/fake"
1522
enginev2alpha2 "github.com/NVIDIA/KAI-scheduler/pkg/apis/scheduling/v2alpha2"
1623
commonconstants "github.com/NVIDIA/KAI-scheduler/pkg/common/constants"
24+
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/api/pod_status"
25+
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/test_utils/jobs_fake"
26+
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/test_utils/tasks_fake"
1727
)
1828

1929
type UpdatePodGroupConditionTest struct {
@@ -474,3 +484,122 @@ func getTimePointer(ts string) *time.Time {
474484
}
475485
return &t
476486
}
487+
488+
type SimpleRecorder struct {
489+
events []string
490+
}
491+
492+
func (r *SimpleRecorder) Event(object runtime.Object, eventtype, reason, message string) {
493+
r.events = append(r.events, eventtype+":"+reason+":"+message)
494+
}
495+
496+
func (r *SimpleRecorder) Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{}) {
497+
r.events = append(r.events, eventtype+":"+reason+":"+messageFmt)
498+
}
499+
500+
func (r *SimpleRecorder) AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{}) {
501+
r.events = append(r.events, eventtype+":"+reason+":"+messageFmt)
502+
}
503+
504+
func TestDefaultStatusUpdater_RecordJobStatusEvent(t *testing.T) {
505+
tests := []struct {
506+
name string
507+
job jobs_fake.TestJobBasic
508+
expectedEventActions []string
509+
expectedInFlightPodGroups int
510+
}{
511+
{
512+
name: "Running job",
513+
job: jobs_fake.TestJobBasic{
514+
Name: "test-job",
515+
Namespace: "test-ns",
516+
QueueName: "test-queue",
517+
MinAvailable: ptr.To(int32(1)),
518+
Tasks: []*tasks_fake.TestTaskBasic{
519+
{
520+
Name: "test-task",
521+
State: pod_status.Running,
522+
},
523+
},
524+
},
525+
expectedEventActions: []string{},
526+
expectedInFlightPodGroups: 1,
527+
},
528+
{
529+
name: "No ready job",
530+
job: jobs_fake.TestJobBasic{
531+
Name: "test-job",
532+
Namespace: "test-ns",
533+
QueueName: "test-queue",
534+
MinAvailable: ptr.To(int32(2)),
535+
Tasks: []*tasks_fake.TestTaskBasic{
536+
{
537+
Name: "test-task",
538+
State: pod_status.Pending,
539+
},
540+
},
541+
},
542+
expectedEventActions: []string{"Normal NotReady Job is not ready for scheduling. Waiting for 2 pods, currently 1 exist, 0 are gated"},
543+
expectedInFlightPodGroups: 0,
544+
},
545+
{
546+
name: "Unscheduleable job",
547+
job: jobs_fake.TestJobBasic{
548+
Name: "test-job",
549+
Namespace: "test-ns",
550+
QueueName: "test-queue",
551+
MinAvailable: ptr.To(int32(1)),
552+
Tasks: []*tasks_fake.TestTaskBasic{
553+
{
554+
Name: "test-task",
555+
State: pod_status.Pending,
556+
},
557+
},
558+
},
559+
expectedEventActions: []string{"Warning Unschedulable Unable to schedule pod", "Normal Unschedulable Unable to schedule podgroup"},
560+
expectedInFlightPodGroups: 1,
561+
},
562+
}
563+
for _, test := range tests {
564+
t.Run(test.name, func(t *testing.T) {
565+
kubeClient := fake.NewSimpleClientset()
566+
kubeAiSchedClient := kubeaischedfake.NewSimpleClientset()
567+
recorder := record.NewFakeRecorder(100)
568+
statusUpdater := New(kubeClient, kubeAiSchedClient, recorder, 1, false)
569+
wg := sync.WaitGroup{}
570+
finishUpdatesChan := make(chan struct{})
571+
// wait with pod groups update until signal is given.
572+
kubeAiSchedClient.SchedulingV2alpha2().(*fakeschedulingv2alpha2.FakeSchedulingV2alpha2).PrependReactor(
573+
"update", "podgroups", func(action faketesting.Action) (handled bool, ret runtime.Object, err error) {
574+
<-finishUpdatesChan
575+
wg.Done()
576+
return false, nil, nil
577+
},
578+
)
579+
580+
stopCh := make(chan struct{})
581+
statusUpdater.Run(stopCh)
582+
583+
jobsMap, _, _ := jobs_fake.BuildJobsAndTasksMaps([]*jobs_fake.TestJobBasic{&test.job})
584+
585+
statusUpdater.RecordJobStatusEvent(jobsMap["test-job"])
586+
587+
events := []string{}
588+
close(recorder.Events)
589+
for event := range recorder.Events {
590+
events = append(events, event)
591+
}
592+
assert.Equal(t, test.expectedEventActions, events)
593+
inFlightPodGroups := 0
594+
statusUpdater.inFlightPodGroups.Range(func(key, value any) bool {
595+
inFlightPodGroups += 1
596+
return true
597+
})
598+
assert.Equal(t, test.expectedInFlightPodGroups, inFlightPodGroups)
599+
600+
close(finishUpdatesChan)
601+
wg.Wait()
602+
close(stopCh)
603+
})
604+
}
605+
}

pkg/scheduler/test_utils/jobs_fake/jobs.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,7 @@ func BuildJobInfo(
134134
if staleDuration != nil {
135135
staleTime := time.Now().Add(-1 * *staleDuration)
136136
result.StalenessInfo.TimeStamp = &staleTime
137+
result.StalenessInfo.Stale = true
137138
}
138139
if result.LastStartTimestamp == nil && result.GetNumAllocatedTasks() > 0 {
139140
startTime := time.Now().Add(-1 * time.Minute * 1)

0 commit comments

Comments
 (0)