Skip to content

Commit d344404

Browse files
authored
scheduler: Add LastStartTimestamp to PodGroup (#153)
* add podgroup starttime annotation * call it LastStartTimeStamp instead * set LastStartTimestamp after successful commit on allocate * unit test to ensure start time is set * add proper unit test for lastStartTime validation for elastic
1 parent 0b5e34d commit d344404

File tree

10 files changed

+141
-32
lines changed

10 files changed

+141
-32
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).
99
### Added
1010
- Added support for [k8s pod scheduling gates](https://kubernetes.io/docs/concepts/scheduling-eviction/pod-scheduling-readiness/)
1111
- nodeSelector, affinity and tolerations configurable with global value definitions
12+
- Scheduler now adds a "LastStartTimestamp" to podgroup on allocation
1213

1314
### Changed
1415
- Queue order function now takes into account potential victims, resulting in better reclaim scenarios.

pkg/common/constants/constants.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ const (
2323
RunaiGpuLimit = "runai-gpu-limit"
2424
MpsAnnotation = "mps"
2525
StalePodgroupTimeStamp = "kai.scheduler/stale-podgroup-timestamp"
26+
LastStartTimeStamp = "kai.scheduler/last-start-timestamp"
2627

2728
// Labels
2829
NodePoolNameLabel = "runai/node-pool"

pkg/scheduler/actions/allocate/allocate.go

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
package allocate
55

66
import (
7+
"time"
8+
79
"golang.org/x/exp/maps"
810

911
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/actions/common"
@@ -41,19 +43,24 @@ func (alloc *allocateAction) Execute(ssn *framework.Session) {
4143
for !jobsOrderByQueues.IsEmpty() {
4244
job := jobsOrderByQueues.PopNextJob()
4345
stmt := ssn.Statement()
44-
if attemptToAllocateJob(ssn, stmt, job) {
46+
alreadyAllocated := job.GetNumAllocatedTasks() > 0
47+
if ok, pipelined := attemptToAllocateJob(ssn, stmt, job); ok {
4548
metrics.IncPodgroupScheduledByAction()
4649
err := stmt.Commit()
50+
if err == nil && !pipelined && !alreadyAllocated {
51+
setLastStartTimestamp(job)
52+
}
4753
if err == nil && podgroup_info.HasTasksToAllocate(job, true) {
4854
jobsOrderByQueues.PushJob(job)
55+
continue
4956
}
5057
} else {
5158
stmt.Discard()
5259
}
5360
}
5461
}
5562

56-
func attemptToAllocateJob(ssn *framework.Session, stmt *framework.Statement, job *podgroup_info.PodGroupInfo) bool {
63+
func attemptToAllocateJob(ssn *framework.Session, stmt *framework.Statement, job *podgroup_info.PodGroupInfo) (allocated, pipelined bool) {
5764
queue := ssn.Queues[job.Queue]
5865

5966
resReq := podgroup_info.GetTasksToAllocateInitResource(job, ssn.TaskOrderFn, true)
@@ -64,9 +71,9 @@ func attemptToAllocateJob(ssn *framework.Session, stmt *framework.Statement, job
6471
if !common.AllocateJob(ssn, stmt, nodes, job, false) {
6572
log.InfraLogger.V(3).Infof("Could not allocate resources for job: <%v/%v> of queue <%v>",
6673
job.Namespace, job.Name, job.Queue)
67-
return false
74+
return false, false
6875
}
69-
76+
pipelined = false
7077
if job.ShouldPipelineJob() {
7178
log.InfraLogger.V(3).Infof(
7279
"Some tasks were pipelined, setting all job to be pipelined for job: <%v/%v>",
@@ -76,12 +83,18 @@ func attemptToAllocateJob(ssn *framework.Session, stmt *framework.Statement, job
7683
log.InfraLogger.Errorf(
7784
"Failed to covert tasks from allocated to pipelined for job: <%v/%v>, error: <%v>",
7885
job.Namespace, job.Name, err)
79-
return false
86+
return false, false
8087
}
88+
pipelined = true
8189
} else {
8290
log.InfraLogger.V(3).Infof("Succesfully allocated resources for job: <%v/%v>",
8391
job.Namespace, job.Name)
8492
}
8593

86-
return true
94+
return true, pipelined
95+
}
96+
97+
func setLastStartTimestamp(job *podgroup_info.PodGroupInfo) {
98+
timeNow := time.Now()
99+
job.LastStartTimestamp = &timeNow
87100
}

pkg/scheduler/actions/allocate/allocateElastic_test.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package allocate_test
55

66
import (
77
"testing"
8+
"time"
89

910
. "go.uber.org/mock/gomock"
1011
"k8s.io/utils/pointer"
@@ -280,9 +281,10 @@ func getElasticTestsMetadata() []integration_tests_utils.TestTopologyMetadata {
280281
},
281282
TaskExpectedResults: map[string]test_utils.TestExpectedResultBasic{
282283
"pending_job0-0": {
283-
NodeName: "node0",
284-
GPUsRequired: 1,
285-
Status: pod_status.Running,
284+
NodeName: "node0",
285+
GPUsRequired: 1,
286+
Status: pod_status.Running,
287+
LastStartTimestampOlderThan: pointer.Duration(time.Second * 30),
286288
},
287289
"pending_job0-1": {
288290
NodeName: "node0",

pkg/scheduler/api/podgroup_info/job_info.go

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,9 +66,10 @@ type PodGroupInfo struct {
6666

6767
Allocated *resource_info.Resource
6868

69-
CreationTimestamp metav1.Time
70-
PodGroup *enginev2alpha2.PodGroup
71-
PodGroupUID types.UID
69+
CreationTimestamp metav1.Time
70+
LastStartTimestamp *time.Time
71+
PodGroup *enginev2alpha2.PodGroup
72+
PodGroupUID types.UID
7273

7374
// TODO(k82cn): keep backward compatibility, removed it when v1alpha1 finalized.
7475
PDB *policyv1.PodDisruptionBudget
@@ -100,6 +101,8 @@ func NewPodGroupInfo(uid common_info.PodGroupID, tasks ...*pod_info.PodInfo) *Po
100101
TimeStamp: nil,
101102
Stale: false,
102103
},
104+
105+
LastStartTimestamp: nil,
103106
activeAllocatedCount: ptr.To(0),
104107
}
105108

@@ -140,6 +143,16 @@ func (podGroupInfo *PodGroupInfo) SetPodGroup(pg *enginev2alpha2.PodGroup) {
140143
}
141144
}
142145

146+
if pg.Annotations[commonconstants.LastStartTimeStamp] != "" {
147+
startTime, err := time.Parse(time.RFC3339, pg.Annotations[commonconstants.LastStartTimeStamp])
148+
if err != nil {
149+
log.InfraLogger.V(7).Warnf("Failed to parse start timestamp for podgroup <%s/%s> err: %v",
150+
podGroupInfo.Namespace, podGroupInfo.Name, err)
151+
} else {
152+
podGroupInfo.LastStartTimestamp = &startTime
153+
}
154+
}
155+
143156
log.InfraLogger.V(7).Infof(
144157
"SetPodGroup. podGroupName=<%s>, PodGroupUID=<%s> podGroupInfo.PodGroupIndex=<%d>",
145158
podGroupInfo.Name, podGroupInfo.PodGroupUID)
@@ -273,6 +286,16 @@ func (podGroupInfo *PodGroupInfo) GetNumActiveUsedTasks() int {
273286
return numTasks
274287
}
275288

289+
func (podGroupInfo *PodGroupInfo) GetNumAllocatedTasks() int {
290+
numTasks := 0
291+
for _, task := range podGroupInfo.PodInfos {
292+
if pod_status.AllocatedStatus(task.Status) {
293+
numTasks++
294+
}
295+
}
296+
return numTasks
297+
}
298+
276299
func (podGroupInfo *PodGroupInfo) GetPendingTasks() []*pod_info.PodInfo {
277300
var pendingTasks []*pod_info.PodInfo
278301
for _, task := range podGroupInfo.PodInfos {

pkg/scheduler/cache/status_updater/concurrency.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,15 @@ func (su *defaultStatusUpdater) syncPodGroup(inFlightPodGroup, snapshotPodGroup
6060
}
6161
snapshotPodGroup.Annotations[commonconstants.StalePodgroupTimeStamp] = inFlightPodGroup.Annotations[commonconstants.StalePodgroupTimeStamp]
6262
}
63+
lastStartTimestampUpdated := false
64+
if snapshotPodGroup.Annotations[commonconstants.LastStartTimeStamp] == inFlightPodGroup.Annotations[commonconstants.LastStartTimeStamp] {
65+
lastStartTimestampUpdated = true
66+
} else {
67+
if snapshotPodGroup.Annotations == nil {
68+
snapshotPodGroup.Annotations = make(map[string]string)
69+
}
70+
snapshotPodGroup.Annotations[commonconstants.LastStartTimeStamp] = inFlightPodGroup.Annotations[commonconstants.LastStartTimeStamp]
71+
}
6372

6473
updatedSchedulingCondition := false
6574
lastSchedulingCondition := utils.GetLastSchedulingCondition(inFlightPodGroup)
@@ -74,7 +83,7 @@ func (su *defaultStatusUpdater) syncPodGroup(inFlightPodGroup, snapshotPodGroup
7483
if !updatedSchedulingCondition {
7584
snapshotPodGroup.Status.SchedulingConditions = inFlightPodGroup.Status.SchedulingConditions
7685
}
77-
return staleTimeStampUpdated && updatedSchedulingCondition
86+
return lastStartTimestampUpdated && staleTimeStampUpdated && updatedSchedulingCondition
7887
}
7988

8089
func (su *defaultStatusUpdater) keyForPayload(name, namespace string, uid types.UID) updatePayloadKey {

pkg/scheduler/cache/status_updater/concurrency_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ var _ = Describe("Status Updater Concurrency", func() {
105105
if jobIndex, _ := strconv.Atoi(strings.Split(job.Name, "-")[0]); jobIndex%2 == 0 {
106106
job.StalenessInfo.TimeStamp = ptr.To(time.Now())
107107
job.StalenessInfo.Stale = true
108+
job.LastStartTimestamp = ptr.To(time.Now())
108109
}
109110
Expect(statusUpdater.RecordJobStatusEvent(job)).To(Succeed())
110111
}

pkg/scheduler/cache/status_updater/default_status_updater.go

Lines changed: 38 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -176,8 +176,8 @@ func (su *defaultStatusUpdater) PatchPodLabels(pod *v1.Pod, labels map[string]an
176176
func (su *defaultStatusUpdater) RecordJobStatusEvent(job *podgroup_info.PodGroupInfo) error {
177177
var err error
178178
var patchData []byte
179-
if err, patchData = su.updatePodGroupStaleTimeStamp(job.PodGroup, job.StalenessInfo.TimeStamp); err != nil {
180-
log.InfraLogger.V(7).Warnf("Failed to update podgroup stale time. error:: %s", err)
179+
if patchData, err = su.updatePodGroupAnnotations(job); err != nil {
180+
log.InfraLogger.V(7).Warnf("Failed to update podgroup annotations, error: %s", err)
181181
}
182182
if job.StalenessInfo.Stale {
183183
su.recordStaleJobEvent(job)
@@ -317,23 +317,23 @@ func (su *defaultStatusUpdater) recordUnschedulablePodsEvents(job *podgroup_info
317317
return errors.Join(errs...)
318318
}
319319

320-
func (su *defaultStatusUpdater) updatePodGroupStaleTimeStamp(podGroup *enginev2alpha2.PodGroup, staleTimeStamp *time.Time) (error, []byte) {
321-
old := podGroup.DeepCopy()
322-
updated := setPodGroupStaleTimeStamp(podGroup, staleTimeStamp)
323-
324-
if !updated {
320+
func (su *defaultStatusUpdater) updatePodGroupAnnotations(job *podgroup_info.PodGroupInfo) ([]byte, error) {
321+
old := job.PodGroup.DeepCopy()
322+
updatedStaleTime := setPodGroupStaleTimeStamp(job.PodGroup, job.StalenessInfo.TimeStamp)
323+
updatedStartTime := setPodGroupLastStartTimeStamp(job.PodGroup, job.LastStartTimestamp)
324+
if !updatedStaleTime && !updatedStartTime {
325325
return nil, nil
326326
}
327327

328-
patchData, err := getPodGroupPatch(old, podGroup)
328+
patchData, err := getPodGroupPatch(old, job.PodGroup)
329329
if err != nil {
330-
return err, nil
330+
return nil, err
331331
}
332332

333333
if patchData == nil {
334334
return nil, nil
335335
}
336-
return nil, patchData
336+
return patchData, nil
337337
}
338338

339339
func (su *defaultStatusUpdater) recordUnschedulablePodGroup(job *podgroup_info.PodGroupInfo) bool {
@@ -393,6 +393,34 @@ func setPodGroupStaleTimeStamp(podGroup *enginev2alpha2.PodGroup, staleTimeStamp
393393
return true
394394
}
395395

396+
func setPodGroupLastStartTimeStamp(podGroup *enginev2alpha2.PodGroup, startTimeStamp *time.Time) bool {
397+
if podGroup.Annotations == nil {
398+
podGroup.Annotations = make(map[string]string)
399+
}
400+
401+
if startTimeStamp == nil {
402+
if _, found := podGroup.Annotations[commonconstants.LastStartTimeStamp]; !found {
403+
return false
404+
}
405+
406+
delete(podGroup.Annotations, commonconstants.LastStartTimeStamp)
407+
return true
408+
}
409+
410+
currTimeStamp, found := podGroup.Annotations[commonconstants.LastStartTimeStamp]
411+
if !found {
412+
podGroup.Annotations[commonconstants.LastStartTimeStamp] = startTimeStamp.UTC().Format(time.RFC3339)
413+
return true
414+
}
415+
416+
if currTimeStamp == startTimeStamp.Format(time.RFC3339) {
417+
return false
418+
}
419+
420+
podGroup.Annotations[commonconstants.LastStartTimeStamp] = startTimeStamp.Format(time.RFC3339)
421+
return true
422+
}
423+
396424
func setPodGroupSchedulingCondition(podGroup *enginev2alpha2.PodGroup, schedulingCondition *enginev2alpha2.SchedulingCondition) bool {
397425
currentSchedulingConditionIndex := utils.GetSchedulingConditionIndex(podGroup, schedulingCondition.NodePool)
398426
lastSchedulingCondition := utils.GetLastSchedulingCondition(podGroup)

pkg/scheduler/test_utils/jobs_fake/jobs.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,10 @@ func BuildJobInfo(
135135
staleTime := time.Now().Add(-1 * *staleDuration)
136136
result.StalenessInfo.TimeStamp = &staleTime
137137
}
138+
if result.LastStartTimestamp == nil && result.GetNumAllocatedTasks() > 0 {
139+
startTime := time.Now().Add(-1 * time.Minute * 1)
140+
result.LastStartTimestamp = &startTime
141+
}
138142
return result
139143
}
140144

pkg/scheduler/test_utils/test_utils.go

Lines changed: 36 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"fmt"
1010
"strconv"
1111
"testing"
12+
"time"
1213

1314
// lint:ignore ST1001 we want to use gomock here
1415
. "go.uber.org/mock/gomock"
@@ -96,14 +97,15 @@ type TestSessionConfig struct {
9697
}
9798

9899
type TestExpectedResultBasic struct {
99-
NodeName string
100-
GPUsRequired float64
101-
GPUsAccepted float64
102-
MilliCpuRequired float64
103-
MemoryRequired float64
104-
Status pod_status.PodStatus
105-
GPUGroups []string
106-
DontValidateGPUGroup bool
100+
NodeName string
101+
GPUsRequired float64
102+
GPUsAccepted float64
103+
MilliCpuRequired float64
104+
MemoryRequired float64
105+
Status pod_status.PodStatus
106+
GPUGroups []string
107+
DontValidateGPUGroup bool
108+
LastStartTimestampOlderThan *time.Duration
107109
}
108110

109111
type TestExpectedNodesResources struct {
@@ -163,6 +165,19 @@ func MatchExpectedAndRealTasks(t *testing.T, testNumber int, testMetadata TestTo
163165
}
164166
}
165167
}
168+
169+
if pod_status.AllocatedStatus(jobExpectedResult.Status) {
170+
if job.LastStartTimestamp == nil {
171+
t.Errorf("Test number: %d, name: %v, has failed. Task name: %v, actual last start timestamp is not set expecting pod_status.%v", testNumber, testMetadata.Name, jobName, jobExpectedResult.Status.String())
172+
} else if jobExpectedResult.LastStartTimestampOlderThan != nil {
173+
now := time.Now()
174+
if now.Sub(*job.LastStartTimestamp) < *jobExpectedResult.LastStartTimestampOlderThan {
175+
t.Errorf("Test number: %d, name: %v, has failed. Task name: %v, actual last start timestamp is not older than %v", testNumber, testMetadata.Name, jobName,
176+
*jobExpectedResult.LastStartTimestampOlderThan)
177+
}
178+
}
179+
}
180+
166181
if sumOfAcceptedGpus != jobExpectedResult.GPUsAccepted && jobExpectedResult.GPUsAccepted != 0 {
167182
t.Errorf("Test number: %d, name: %v, has failed. Task name: %v, actual accept GPUs: %v, was expecting GPUs: %v", testNumber, testMetadata.Name, jobName, sumOfAcceptedGpus, jobExpectedResult.GPUsAccepted)
168183
}
@@ -178,7 +193,7 @@ func MatchExpectedAndRealTasks(t *testing.T, testNumber int, testMetadata TestTo
178193
}
179194

180195
if len(testMetadata.TaskExpectedResults) > 0 {
181-
for jobId := range ssn.PodGroupInfos {
196+
for jobId, job := range ssn.PodGroupInfos {
182197
for taskId, task := range ssn.PodGroupInfos[jobId].PodInfos {
183198
taskExpectedResult, found := testMetadata.TaskExpectedResults[string(taskId)]
184199
if !found {
@@ -225,6 +240,18 @@ func MatchExpectedAndRealTasks(t *testing.T, testNumber int, testMetadata TestTo
225240
taskExpectedResult.MemoryRequired)
226241
}
227242

243+
if pod_status.AllocatedStatus(taskExpectedResult.Status) {
244+
if job.LastStartTimestamp == nil {
245+
t.Errorf("Test number: %d, name: %v, has failed. Task name: %v, actual last start timestamp is not set expecting pod_status.%v", testNumber, testMetadata.Name, taskId, taskExpectedResult.Status.String())
246+
} else if taskExpectedResult.LastStartTimestampOlderThan != nil {
247+
now := time.Now()
248+
if now.Sub(*job.LastStartTimestamp) < *taskExpectedResult.LastStartTimestampOlderThan {
249+
t.Errorf("Test number: %d, name: %v, has failed. Task name: %v, actual last start timestamp is not older than %v", testNumber, testMetadata.Name, taskId,
250+
*taskExpectedResult.LastStartTimestampOlderThan)
251+
}
252+
}
253+
}
254+
228255
// verify fractional GPUs index
229256
if pod_status.IsActiveUsedStatus(task.Status) &&
230257
!taskExpectedResult.DontValidateGPUGroup &&

0 commit comments

Comments
 (0)