Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,269 @@
apiVersion: config.karmada.io/v1alpha1
kind: ResourceInterpreterCustomization
metadata:
name: declarative-configuration-pytorchjob
spec:
target:
apiVersion: kubeflow.org/v1
kind: PyTorchJob
customizations:
componentResource:
luaScript: |
local kube = require("kube")

-- Safe fetch of deeply nested table fields.
local function get(obj, path)
local cur = obj
for i = 1, #path do
if cur == nil then
return nil
end
cur = cur[path[i]]
end
return cur
end

-- Normalize possibly-string numbers with a default.
local function to_num(v, default)
if v == nil or v == '' then
return default
end
local n = tonumber(v)
if n ~= nil then
return n
end
return default
end

function GetComponents(observedObj)
local components = {}

-- Master Component
local master_spec = get(observedObj, {"spec", "pytorchReplicaSpecs", "Master"})
if master_spec ~= nil then
local master_replicas = to_num(master_spec.replicas, 1)
local master_template = master_spec.template

local master_requires = {}
if master_template ~= nil then
master_requires = kube.accuratePodRequirements(master_template)
end

local masterComponent = {
name = "master",
replicas = master_replicas,
replicaRequirements = master_requires
}
table.insert(components, masterComponent)
end

-- Worker Component
local worker_spec = get(observedObj, {"spec", "pytorchReplicaSpecs", "Worker"})
if worker_spec ~= nil then
local worker_replicas = to_num(worker_spec.replicas, 1)
local worker_template = worker_spec.template

local worker_requires = {}
if worker_template ~= nil then
worker_requires = kube.accuratePodRequirements(worker_template)
end

local workerComponent = {
name = "worker",
replicas = worker_replicas,
replicaRequirements = worker_requires
}
table.insert(components, workerComponent)
end

return components
end

statusAggregation:
luaScript: >
local function omitEmpty(t)
if t == nil then return nil end
local out = {}
for k, v in pairs(t) do
if type(v) == "table" then
local inner = omitEmpty(v)
if inner ~= nil and next(inner) ~= nil then
out[k] = inner
end
elseif v ~= nil and not (v == 0 or v == "" or v == "0s") then
out[k] = v
end
end
if next(out) ~= nil then
return out
else
return nil
end
end
function AggregateStatus(desiredObj, statusItems)
if desiredObj.status == nil then
desiredObj.status = {}
end

-- If no member cluster status, initialize default status
if statusItems == nil then
desiredObj.status.startTime = nil
desiredObj.status.completionTime = nil
desiredObj.status.replicaStatuses = {}
desiredObj.status.conditions = {}
return desiredObj
end

local startTime = nil
local completionTime = nil
local lastReconcileTime = nil
local replicaStatuses = {}
local aggregatedConditions = {}
local successfulClustersNum = 0
local failedClusters = {}

-- Initialize Master and Worker status
replicaStatuses.Master = { active = 0, failed = 0, succeeded = 0 }
replicaStatuses.Worker = { active = 0, failed = 0, succeeded = 0 }

for i = 1, #statusItems do
if statusItems[i].status ~= nil then
-- Aggregate time fields (earliest start time, latest completion time and reconcile time)
if statusItems[i].status.startTime ~= nil then
if startTime == nil or statusItems[i].status.startTime < startTime then
startTime = statusItems[i].status.startTime
end
end

if statusItems[i].status.completionTime ~= nil then
if completionTime == nil or statusItems[i].status.completionTime > completionTime then
completionTime = statusItems[i].status.completionTime
end
end

if statusItems[i].status.lastReconcileTime ~= nil then
if lastReconcileTime == nil or statusItems[i].status.lastReconcileTime > lastReconcileTime then
lastReconcileTime = statusItems[i].status.lastReconcileTime
end
end

-- Aggregate replica status
if statusItems[i].status.replicaStatuses ~= nil then
if statusItems[i].status.replicaStatuses.Master ~= nil then
replicaStatuses.Master.active = replicaStatuses.Master.active + (statusItems[i].status.replicaStatuses.Master.active or 0)
replicaStatuses.Master.failed = replicaStatuses.Master.failed + (statusItems[i].status.replicaStatuses.Master.failed or 0)
replicaStatuses.Master.succeeded = replicaStatuses.Master.succeeded + (statusItems[i].status.replicaStatuses.Master.succeeded or 0)
end

if statusItems[i].status.replicaStatuses.Worker ~= nil then
replicaStatuses.Worker.active = replicaStatuses.Worker.active + (statusItems[i].status.replicaStatuses.Worker.active or 0)
replicaStatuses.Worker.failed = replicaStatuses.Worker.failed + (statusItems[i].status.replicaStatuses.Worker.failed or 0)
replicaStatuses.Worker.succeeded = replicaStatuses.Worker.succeeded + (statusItems[i].status.replicaStatuses.Worker.succeeded or 0)
end
end

-- Aggregate condition status (merge conditions from all member clusters)
local isFinished = false
local finishedType = ""
if statusItems[i].status.conditions ~= nil then
for _, c in ipairs(statusItems[i].status.conditions) do
-- Like kubernetes native Job, we do not merge conditions from member clusters,
-- but generate a new condition by PytorchJob finish state.
-- table.insert(aggregatedConditions, c)
if (c.type == "Succeeded" or c.type == "Failed") and c.status == "True" then
isFinished = true
finishedType = c.type
end
end
end

if isFinished then
if finishedType == "Succeeded" then
successfulClustersNum = successfulClustersNum + 1
elseif finishedType == "Failed" then
table.insert(failedClusters, statusItems[i].clusterName)
end
end

end
end

if #failedClusters > 0 then
table.insert(aggregatedConditions, {
type = "Failed",
status = "True",
lastProbeTime = os.date("!%Y-%m-%dT%H:%M:%SZ"),
lastTransitionTime = os.date("!%Y-%m-%dT%H:%M:%SZ"),
reason = "PyTorchJobFailed",
message = "PyTorchJob executed failed in member clusters: " .. table.concat(failedClusters, ", ")
})
end

if successfulClustersNum == #statusItems and successfulClustersNum > 0 then
table.insert(aggregatedConditions, {
type = "Succeeded",
status = "True",
lastProbeTime = os.date("!%Y-%m-%dT%H:%M:%SZ"),
lastTransitionTime = os.date("!%Y-%m-%dT%H:%M:%SZ"),
reason = "Completed",
message = "PyTorchJob completed successfully"
})
desiredObj.status.completionTime = completionTime
end

-- Set aggregated status
desiredObj.status.startTime = startTime
desiredObj.status.lastReconcileTime = lastReconcileTime
desiredObj.status.replicaStatuses = replicaStatuses
desiredObj.status.conditions = aggregatedConditions
local tmp = desiredObj.status
desiredObj.status = omitEmpty(tmp)

return desiredObj
end

healthInterpretation:
luaScript: >
function InterpretHealth(observedObj)
if observedObj == nil or
observedObj.status == nil or
observedObj.status.conditions == nil then
return false
end

-- Determine health based on PyTorchJob status
for i = 1, #observedObj.status.conditions do
local condition = observedObj.status.conditions[i]
if condition.type == "Failed" and condition.status == "True" then
return false
Comment on lines +237 to +238
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is no condition with Type as Failed, the final interpretation result is also true, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, so we will return true in L259

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you considered the situation where the condition has not yet been filled?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you considered the situation where the condition has not yet been filled?

Will observedObj.status.conditions be nil in this situation?

We will return false in L249.

end
end
return true
end

dependencyInterpretation:
luaScript: >
local kube = require("kube")
function GetDependencies(desiredObj)
local refs = {}

if desiredObj.spec == nil or desiredObj.spec.pytorchReplicaSpecs == nil then
return refs
end

-- Iterate PyTorchJob replica types
local replicaTypes = {"Master", "Worker"}
for _, replicaType in ipairs(replicaTypes) do
local spec = desiredObj.spec.pytorchReplicaSpecs[replicaType]
if spec ~= nil and spec.template ~= nil then
local deps = kube.getPodDependencies(spec.template, desiredObj.metadata.namespace)
if deps ~= nil then
for _, dep in ipairs(deps) do
table.insert(refs, dep)
end
end
end
end

return refs
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
tests:
- desiredInputPath: testdata/desired-pytorchjob.yaml
statusInputPath: testdata/status-file.yaml
operation: AggregateStatus
- observedInputPath: testdata/observed-pytorchjob.yaml
operation: InterpretHealth
- observedInputPath: testdata/observed-pytorchjob.yaml
operation: InterpretComponent
- desiredInputPath: testdata/desired-pytorchjob.yaml
operation: InterpretDependency
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
apiVersion: kubeflow.org/v1
kind: PyTorchJob
metadata:
name: pytorch-simple
namespace: kubeflow
spec:
pytorchReplicaSpecs:
Master:
replicas: 2
restartPolicy: OnFailure
template:
spec:
containers:
- name: pytorch
image: docker.io/kubeflowkatib/pytorch-mnist:v1beta1-45c5727
imagePullPolicy: Always
command:
- "python3"
- "/opt/pytorch-mnist/mnist.py"
- "--epochs=1"
resources:
limits:
cpu: 1
memory: 512Mi
Worker:
replicas: 2
restartPolicy: OnFailure
template:
spec:
serviceAccountName: pytorch-service-account
containers:
- name: pytorch
image: docker.io/kubeflowkatib/pytorch-mnist:v1beta1-45c5727
imagePullPolicy: Always
volumeMounts:
- mountPath: "/train"
name: "training"
command:
- "python3"
- "/opt/pytorch-mnist/mnist.py"
- "--epochs=1"
resources:
limits:
cpu: 1
memory: 512Mi
volumes:
- name: "training"
persistentVolumeClaim:
claimName: "training-data"
Loading