Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ $(SERVICE_NAMES):
$(MAKE) docker-build-generic SERVICE_NAME=$@

.PHONY: validate
validate: generate manifests clients gen-license generate-mocks
validate: generate manifests clients gen-license generate-mocks lint
git diff --exit-code

.PHONY: generate-mocks
Expand Down Expand Up @@ -96,4 +96,4 @@ KUSTOMIZE_INSTALL_SCRIPT ?= "https://raw.githubusercontent.com/kubernetes-sigs/k
.PHONY: kustomize
kustomize: $(KUSTOMIZE)
$(KUSTOMIZE): $(LOCALBIN)
test -s $(LOCALBIN)/kustomize || { curl -Ss $(KUSTOMIZE_INSTALL_SCRIPT) --output install_kustomize.sh && bash install_kustomize.sh $(subst v,,$(KUSTOMIZE_VERSION)) $(LOCALBIN); rm install_kustomize.sh; }
test -s $(LOCALBIN)/kustomize || { curl -Ss $(KUSTOMIZE_INSTALL_SCRIPT) --output install_kustomize.sh && bash install_kustomize.sh $(subst v,,$(KUSTOMIZE_VERSION)) $(LOCALBIN); rm install_kustomize.sh; }
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ import (
"context"
"testing"

"github.com/NVIDIA/KAI-scheduler/pkg/podgrouper/podgrouper/plugins"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/NVIDIA/KAI-scheduler/pkg/podgrouper/podgrouper/plugins"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down
24 changes: 12 additions & 12 deletions pkg/scheduler/api/cluster_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,18 @@ import (

// ClusterInfo is a snapshot of cluster by cache.
type ClusterInfo struct {
Pods []*v1.Pod
PodGroupInfos map[common_info.PodGroupID]*podgroup_info.PodGroupInfo
Nodes map[string]*node_info.NodeInfo
BindRequests bindrequest_info.BindRequestMap
BindRequestsForDeletedNodes []*bindrequest_info.BindRequestInfo
Queues map[common_info.QueueID]*queue_info.QueueInfo
Departments map[common_info.QueueID]*queue_info.QueueInfo
StorageClaims map[storageclaim_info.Key]*storageclaim_info.StorageClaimInfo
StorageCapacities map[common_info.StorageCapacityID]*storagecapacity_info.StorageCapacityInfo
CSIDrivers map[common_info.CSIDriverID]*csidriver_info.CSIDriverInfo
StorageClasses map[common_info.StorageClassID]*storageclass_info.StorageClassInfo
ConfigMaps map[common_info.ConfigMapID]*configmap_info.ConfigMapInfo
Pods []*v1.Pod `json:"pods,omitempty"`
PodGroupInfos map[common_info.PodGroupID]*podgroup_info.PodGroupInfo `json:"podGroupInfos,omitempty"`
Nodes map[string]*node_info.NodeInfo `json:"nodes,omitempty"`
BindRequests bindrequest_info.BindRequestMap `json:"bindRequests,omitempty"`
BindRequestsForDeletedNodes []*bindrequest_info.BindRequestInfo `json:"bindRequestsForDeletedNodes,omitempty"`
Queues map[common_info.QueueID]*queue_info.QueueInfo `json:"queues,omitempty"`
Departments map[common_info.QueueID]*queue_info.QueueInfo `json:"departments,omitempty"`
StorageClaims map[storageclaim_info.Key]*storageclaim_info.StorageClaimInfo `json:"storageClaims,omitempty"`
StorageCapacities map[common_info.StorageCapacityID]*storagecapacity_info.StorageCapacityInfo `json:"storageCapacities,omitempty"`
CSIDrivers map[common_info.CSIDriverID]*csidriver_info.CSIDriverInfo `json:"csiDrivers,omitempty"`
StorageClasses map[common_info.StorageClassID]*storageclass_info.StorageClassInfo `json:"storageClasses,omitempty"`
ConfigMaps map[common_info.ConfigMapID]*configmap_info.ConfigMapInfo `json:"configMaps,omitempty"`
}

func NewClusterInfo() *ClusterInfo {
Expand Down
46 changes: 23 additions & 23 deletions pkg/scheduler/api/common_info/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,19 +67,19 @@ func NewFitErrorInsufficientResource(
var shortMessages []string
var detailedMessages []string

if len(resourceRequested.MigResources()) > 0 {
for migProfile, quant := range resourceRequested.MigResources() {
if len(resourceRequested.MIGResources) > 0 {
for migProfile, quant := range resourceRequested.MIGResources {
availableMigProfilesQuant := int64(0)
capacityMigProfilesQuant := int64(0)
if _, found := availableResource.ScalarResources()[migProfile]; found {
availableMigProfilesQuant = availableResource.ScalarResources()[migProfile]
capacityMigProfilesQuant = capacityResource.ScalarResources()[migProfile]
if _, found := availableResource.ScalarResources[migProfile]; found {
availableMigProfilesQuant = availableResource.ScalarResources[migProfile]
capacityMigProfilesQuant = capacityResource.ScalarResources[migProfile]
}
if availableMigProfilesQuant < quant {
detailedMessages = append(detailedMessages, k8s_internal.NewInsufficientResourceErrorScalarResources(
migProfile,
quant,
usedResource.ScalarResources()[migProfile],
usedResource.ScalarResources[migProfile],
capacityMigProfilesQuant,
gangSchedulingJob))
shortMessages = append(shortMessages, fmt.Sprintf("node(s) didn't have enough of mig profile: %s",
Expand All @@ -88,13 +88,13 @@ func NewFitErrorInsufficientResource(
}
} else {
requestedGPUs := resourceRequested.GPUs()
availableGPUs := availableResource.GPUs()
availableGPUs := availableResource.GPUs
if requestedGPUs > availableGPUs {
detailedMessages = append(detailedMessages, k8s_internal.NewInsufficientResourceError(
"GPUs",
generateRequestedGpuString(resourceRequested),
strconv.FormatFloat(usedResource.GPUs(), 'g', 3, 64),
strconv.FormatFloat(capacityResource.GPUs(), 'g', 3, 64),
strconv.FormatFloat(usedResource.GPUs, 'g', 3, 64),
strconv.FormatFloat(capacityResource.GPUs, 'g', 3, 64),
gangSchedulingJob))
shortMessages = append(shortMessages, "node(s) didn't have enough resources: GPUs")
}
Expand All @@ -106,40 +106,40 @@ func NewFitErrorInsufficientResource(
}
}

requestedCPUs := int64(resourceRequested.Cpu())
availableCPUs := int64(availableResource.Cpu())
requestedCPUs := int64(resourceRequested.CPUMilliCores)
availableCPUs := int64(availableResource.CPUMilliCores)
if requestedCPUs > availableCPUs {
detailedMessages = append(detailedMessages, k8s_internal.NewInsufficientResourceError(
"CPU cores",
humanize.FtoaWithDigits(resourceRequested.Cpu()/resource_info.MilliCPUToCores, 3),
humanize.FtoaWithDigits(usedResource.Cpu()/resource_info.MilliCPUToCores, 3),
humanize.FtoaWithDigits(capacityResource.Cpu()/resource_info.MilliCPUToCores, 3),
humanize.FtoaWithDigits(resourceRequested.CPUMilliCores/resource_info.MilliCPUToCores, 3),
humanize.FtoaWithDigits(usedResource.CPUMilliCores/resource_info.MilliCPUToCores, 3),
humanize.FtoaWithDigits(capacityResource.CPUMilliCores/resource_info.MilliCPUToCores, 3),
gangSchedulingJob))
shortMessages = append(shortMessages, "node(s) didn't have enough resources: CPU cores")
}

if resourceRequested.Memory() > availableResource.Memory() {
if resourceRequested.MemoryBytes > availableResource.MemoryBytes {
detailedMessages = append(detailedMessages, k8s_internal.NewInsufficientResourceError(
"memory",
humanize.FtoaWithDigits(resourceRequested.Memory()/resource_info.MemoryToGB, 3),
humanize.FtoaWithDigits(usedResource.Memory()/resource_info.MemoryToGB, 3),
humanize.FtoaWithDigits(capacityResource.Memory()/resource_info.MemoryToGB, 3),
humanize.FtoaWithDigits(resourceRequested.MemoryBytes/resource_info.MemoryToGB, 3),
humanize.FtoaWithDigits(usedResource.MemoryBytes/resource_info.MemoryToGB, 3),
humanize.FtoaWithDigits(capacityResource.MemoryBytes/resource_info.MemoryToGB, 3),
gangSchedulingJob))
shortMessages = append(shortMessages, "node(s) didn't have enough resources: memory")
}

for requestedResourceName, requestedResourceQuant := range resourceRequested.ScalarResources() {
for requestedResourceName, requestedResourceQuant := range resourceRequested.ScalarResources {
availableResourceQuant := int64(0)
capacityResourceQuant := int64(0)
if _, found := availableResource.ScalarResources()[requestedResourceName]; found {
availableResourceQuant = availableResource.ScalarResources()[requestedResourceName]
capacityResourceQuant = capacityResource.ScalarResources()[requestedResourceName]
if _, found := availableResource.ScalarResources[requestedResourceName]; found {
availableResourceQuant = availableResource.ScalarResources[requestedResourceName]
capacityResourceQuant = capacityResource.ScalarResources[requestedResourceName]
}
if availableResourceQuant < requestedResourceQuant {
detailedMessages = append(detailedMessages, k8s_internal.NewInsufficientResourceErrorScalarResources(
requestedResourceName,
requestedResourceQuant,
usedResource.ScalarResources()[requestedResourceName], capacityResourceQuant,
usedResource.ScalarResources[requestedResourceName], capacityResourceQuant,
gangSchedulingJob))
shortMessages = append(shortMessages, fmt.Sprintf("node(s) didn't have enough resources: %s",
requestedResourceName))
Expand Down
6 changes: 3 additions & 3 deletions pkg/scheduler/api/configmap_info/configmap_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ import (
)

type ConfigMapInfo struct {
UID common_info.ConfigMapID
Name string
Namespace string
UID common_info.ConfigMapID `json:"uid,omitempty"`
Name string `json:"name,omitempty"`
Namespace string `json:"namespace,omitempty"`
}

func NewConfigMapInfo(configMap *v1.ConfigMap) *ConfigMapInfo {
Expand Down
4 changes: 2 additions & 2 deletions pkg/scheduler/api/csidriver_info/csidriver_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,6 @@ package csidriver_info
import "github.com/NVIDIA/KAI-scheduler/pkg/scheduler/api/common_info"

type CSIDriverInfo struct {
ID common_info.CSIDriverID
CapacityEnabled bool
ID common_info.CSIDriverID `json:"id,omitempty"`
CapacityEnabled bool `json:"capacityEnabled,omitempty"`
}
14 changes: 7 additions & 7 deletions pkg/scheduler/api/node_info/gpu_sharing_node_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ func getAcceptedTaskResourceWithoutSharedGPU(task *pod_info.PodInfo) *resource_i
requestedResourceWithoutSharedGPU := resource_info.EmptyResource()
requestedResourceWithoutSharedGPU.BaseResource = *task.AcceptedResource.BaseResource.Clone()
requestedResourceWithoutSharedGPU.SetGPUs(task.AcceptedResource.GPUs())
maps.Copy(requestedResourceWithoutSharedGPU.ScalarResources(), task.AcceptedResource.MigResources())
maps.Copy(requestedResourceWithoutSharedGPU.ScalarResources(), task.AcceptedResource.ScalarResources())
maps.Copy(requestedResourceWithoutSharedGPU.ScalarResources, task.AcceptedResource.MIGResources)
maps.Copy(requestedResourceWithoutSharedGPU.ScalarResources, task.AcceptedResource.ScalarResources)
if task.IsSharedGPUAllocation() {
requestedResourceWithoutSharedGPU.SetGPUs(0)
}
Expand Down Expand Up @@ -106,7 +106,7 @@ func (ni *NodeInfo) addSharedTaskResourcesPerPodGroup(task *pod_info.PodInfo, gp
ni.Releasing.AddGPUs(1)
ni.markSharedGpuAsReleasing(gpuGroup)
}
if int(ni.GetNumberOfGPUsInNode()) < int(ni.Idle.GPUs())+ni.getNumberOfUsedGPUs() {
if int(ni.GetNumberOfGPUsInNode()) < int(ni.Idle.GPUs)+ni.getNumberOfUsedGPUs() {
ni.Idle.SubGPUs(1)
}
}
Expand All @@ -122,7 +122,7 @@ func (ni *NodeInfo) addSharedTaskResourcesPerPodGroup(task *pod_info.PodInfo, gp

if ni.UsedSharedGPUsMemory[gpuGroup] <= ni.GetResourceGpuMemory(task.ResReq) {
// no other fractional was allocated here yet
if int(ni.GetNumberOfGPUsInNode()) < int(ni.Idle.GPUs())+ni.getNumberOfUsedGPUs() {
if int(ni.GetNumberOfGPUsInNode()) < int(ni.Idle.GPUs)+ni.getNumberOfUsedGPUs() {
ni.Idle.SubGPUs(1)
}
}
Expand Down Expand Up @@ -181,7 +181,7 @@ func (ni *NodeInfo) removeSharedTaskResourcesPerPodGroup(task *pod_info.PodInfo,

if ni.UsedSharedGPUsMemory[gpuGroup] <= 0 {
// is this the last releasing task for this gpu
if int(ni.GetNumberOfGPUsInNode()) >= int(ni.Idle.GPUs())+ni.getNumberOfUsedGPUs() {
if int(ni.GetNumberOfGPUsInNode()) >= int(ni.Idle.GPUs)+ni.getNumberOfUsedGPUs() {
ni.Idle.AddGPUs(1)
}
if ni.isSharedGpuMarkedAsReleasing(gpuGroup) {
Expand Down Expand Up @@ -211,7 +211,7 @@ func (ni *NodeInfo) removeSharedTaskResourcesPerPodGroup(task *pod_info.PodInfo,

if ni.UsedSharedGPUsMemory[gpuGroup] <= 0 {
// no other fractional was allocated here yet
if int(ni.GetNumberOfGPUsInNode()) >= int(ni.Idle.GPUs())+ni.getNumberOfUsedGPUs() {
if int(ni.GetNumberOfGPUsInNode()) >= int(ni.Idle.GPUs)+ni.getNumberOfUsedGPUs() {
ni.Idle.AddGPUs(1)
}
}
Expand Down Expand Up @@ -271,7 +271,7 @@ func (ni *NodeInfo) getNumberOfUsedSharedGPUs() int {
}

func (ni *NodeInfo) getNumberOfUsedGPUs() int {
return int(ni.Used.GPUs()) + ni.getNumberOfUsedSharedGPUs()
return int(ni.Used.GPUs) + ni.getNumberOfUsedSharedGPUs()
}

func (ni *NodeInfo) GetNumberOfAllocatedSharedGPUs() int {
Expand Down
54 changes: 27 additions & 27 deletions pkg/scheduler/api/node_info/node_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,30 +49,30 @@ const (

// NodeInfo is node level aggregated information.
type NodeInfo struct {
Name string
Node *v1.Node
Name string `json:"name,omitempty"`
Node *v1.Node `json:"node,omitempty"`

// The releasing resource on that node (excluding shared GPUs)
Releasing *resource_info.Resource
Releasing *resource_info.Resource `json:"releasing,omitempty"`
// The idle resource on that node (excluding shared GPUs)
Idle *resource_info.Resource
Idle *resource_info.Resource `json:"idle,omitempty"`
// The used resource on that node, including running and terminating
// pods (excluding shared GPUs)
Used *resource_info.Resource
Used *resource_info.Resource `json:"used,omitempty"`

Allocatable *resource_info.Resource
Allocatable *resource_info.Resource `json:"allocatable,omitempty"`

AccessibleStorageCapacities map[common_info.StorageClassID][]*sc_info.StorageCapacityInfo
AccessibleStorageCapacities map[common_info.StorageClassID][]*sc_info.StorageCapacityInfo `json:"accessibleStorageCapacities,omitempty"`

PodInfos map[common_info.PodID]*pod_info.PodInfo
MaxTaskNum int
MemoryOfEveryGpuOnNode int64
GpuMemorySynced bool
LegacyMIGTasks map[common_info.PodID]string
PodInfos map[common_info.PodID]*pod_info.PodInfo `json:"podInfos,omitempty"`
MaxTaskNum int `json:"maxTaskNum,omitempty"`
MemoryOfEveryGpuOnNode int64 `json:"memoryOfEveryGpuOnNode,omitempty"`
GpuMemorySynced bool `json:"gpuMemorySynced,omitempty"`
LegacyMIGTasks map[common_info.PodID]string `json:"legacyMigTasks,omitempty"`

PodAffinityInfo pod_affinity.NodePodAffinityInfo
PodAffinityInfo pod_affinity.NodePodAffinityInfo `json:"podAffinityInfo,omitempty"`

GpuSharingNodeInfo
GpuSharingNodeInfo `json:"gpuSharingNodeInfo,omitempty"`
}

func NewNodeInfo(node *v1.Node, podAffinityInfo pod_affinity.NodePodAffinityInfo) *NodeInfo {
Expand Down Expand Up @@ -103,7 +103,7 @@ func NewNodeInfo(node *v1.Node, podAffinityInfo pod_affinity.NodePodAffinityInfo
nodeInfo.MaxTaskNum = int(numTasks.Value())

capacity := resource_info.ResourceFromResourceList(node.Status.Capacity)
if capacity.GPUs() != nodeInfo.Allocatable.GPUs() {
if capacity.GPUs != nodeInfo.Allocatable.GPUs {
log.InfraLogger.V(2).Warnf(
"For node %s, the capacity and allocatable are different. Capacity %v, Allocatable %v",
node.Name, capacity.DetailedString(), nodeInfo.Allocatable.DetailedString())
Expand Down Expand Up @@ -307,7 +307,7 @@ func (ni *NodeInfo) isTaskAllocatableOnNonAllocatedResources(
if !ni.isValidGpuPortion(task.ResReq) {
return false
}
nodeIdleOrReleasingWholeGpus := int64(math.Floor(nodeNonAllocatedResources.GPUs()))
nodeIdleOrReleasingWholeGpus := int64(math.Floor(nodeNonAllocatedResources.GPUs))
nodeNonAllocatedResourcesMatchingSharedGpus := ni.fractionTaskGpusAllocatableDeviceCount(task)
if nodeIdleOrReleasingWholeGpus+nodeNonAllocatedResourcesMatchingSharedGpus >= task.ResReq.GetNumOfGpuDevices() {
return true
Expand Down Expand Up @@ -515,9 +515,9 @@ func (ni *NodeInfo) String() string {

func (ni *NodeInfo) GetSumOfIdleGPUs() (float64, int64) {
sumOfSharedGPUs, sumOfSharedGPUsMemory := ni.getSumOfAvailableSharedGPUs()
idleGPUs := ni.Idle.GPUs()
idleGPUs := ni.Idle.GPUs

for resourceName, qty := range ni.Idle.ScalarResources() {
for resourceName, qty := range ni.Idle.ScalarResources {
if !isMigResource(resourceName.String()) {
continue
}
Expand All @@ -534,9 +534,9 @@ func (ni *NodeInfo) GetSumOfIdleGPUs() (float64, int64) {

func (ni *NodeInfo) GetSumOfReleasingGPUs() (float64, int64) {
sumOfSharedGPUs, sumOfSharedGPUsMemory := ni.getSumOfReleasingSharedGPUs()
releasingGPUs := ni.Releasing.GPUs()
releasingGPUs := ni.Releasing.GPUs

for resourceName, qty := range ni.Releasing.ScalarResources() {
for resourceName, qty := range ni.Releasing.ScalarResources {
if !isMigResource(resourceName.String()) {
continue
}
Expand Down Expand Up @@ -569,7 +569,7 @@ func (ni *NodeInfo) GetNumberOfGPUsInNode() int64 {
numberOfGPUs, err := ni.getNodeGpuCountLabelValue()
if err != nil {
log.InfraLogger.V(6).Infof("Node: <%v> had no annotations of nvidia.com/gpu.count", ni.Name)
return int64(ni.Allocatable.GPUs())
return int64(ni.Allocatable.GPUs)
}
return int64(numberOfGPUs)
}
Expand Down Expand Up @@ -629,7 +629,7 @@ func (ni *NodeInfo) IsCPUOnlyNode() bool {
if ni.IsMIGEnabled() {
return false
}
return ni.Allocatable.GPUs() == 0
return ni.Allocatable.GPUs == 0
}

func (ni *NodeInfo) IsMIGEnabled() bool {
Expand All @@ -638,7 +638,7 @@ func (ni *NodeInfo) IsMIGEnabled() bool {
isMig, err := strconv.ParseBool(enabled)
return err == nil && isMig
}
for nodeResource := range ni.Allocatable.ScalarResources() {
for nodeResource := range ni.Allocatable.ScalarResources {
if isMigResource(nodeResource.String()) {
return true
}
Expand All @@ -663,13 +663,13 @@ func (ni *NodeInfo) GetMigStrategy() MigStrategy {

func (ni *NodeInfo) GetRequiredInitQuota(pi *pod_info.PodInfo) *podgroup_info.JobRequirement {
quota := podgroup_info.JobRequirement{}
if len(pi.ResReq.MigResources()) != 0 {
if len(pi.ResReq.MIGResources) != 0 {
quota.GPU = pi.ResReq.GetSumGPUs()
} else {
quota.GPU = ni.getGpuMemoryFractionalOnNode(ni.GetResourceGpuMemory(pi.ResReq))
}
quota.MilliCPU = pi.ResReq.Cpu()
quota.Memory = pi.ResReq.Memory()
quota.MilliCPU = pi.ResReq.CPUMilliCores
quota.Memory = pi.ResReq.MemoryBytes
return &quota
}

Expand All @@ -682,7 +682,7 @@ func (ni *NodeInfo) setAcceptedResources(pi *pod_info.PodInfo) {
if pi.IsMigCandidate() {
pi.ResourceReceivedType = pod_info.ReceivedTypeMigInstance
pi.AcceptedResource.GpuResourceRequirement =
*resource_info.NewGpuResourceRequirementWithMig(pi.ResReq.MigResources())
*resource_info.NewGpuResourceRequirementWithMig(pi.ResReq.MIGResources)
} else if pi.IsFractionCandidate() {
pi.ResourceReceivedType = pod_info.ReceivedTypeFraction
pi.AcceptedResource.GpuResourceRequirement = *resource_info.NewGpuResourceRequirementWithMultiFraction(
Expand Down
Loading