From b7c8fadeb403c92c29098abd39e966d79c32f165 Mon Sep 17 00:00:00 2001 From: Xinyuan Lyu Date: Mon, 13 Oct 2025 22:43:29 +0800 Subject: [PATCH] add resource interpreter for PytorchJob Signed-off-by: Xinyuan Lyu remove unused code Signed-off-by: Xinyuan Lyu remove unused func Signed-off-by: Xinyuan Lyu fmt code Signed-off-by: Xinyuan Lyu Handle cases where Master is undefined, and remove the statusReflection hook point. Signed-off-by: Xinyuan Lyu Add multiple statuses and ResourceRequirements in test files. Signed-off-by: Xinyuan Lyu Remove InterpretStatus in test files. Signed-off-by: Xinyuan Lyu Add dependencyInterpretation and test case. Signed-off-by: Xinyuan Lyu --- .../v1/PyTorchJob/customizations.yaml | 269 ++++++++++++++++++ .../v1/PyTorchJob/customizations_tests.yaml | 10 + .../testdata/desired-pytorchjob.yaml | 49 ++++ .../testdata/observed-pytorchjob.yaml | 78 +++++ .../v1/PyTorchJob/testdata/status-file.yaml | 65 +++++ 5 files changed, 471 insertions(+) create mode 100644 pkg/resourceinterpreter/default/thirdparty/resourcecustomizations/kubeflow.org/v1/PyTorchJob/customizations.yaml create mode 100644 pkg/resourceinterpreter/default/thirdparty/resourcecustomizations/kubeflow.org/v1/PyTorchJob/customizations_tests.yaml create mode 100644 pkg/resourceinterpreter/default/thirdparty/resourcecustomizations/kubeflow.org/v1/PyTorchJob/testdata/desired-pytorchjob.yaml create mode 100644 pkg/resourceinterpreter/default/thirdparty/resourcecustomizations/kubeflow.org/v1/PyTorchJob/testdata/observed-pytorchjob.yaml create mode 100644 pkg/resourceinterpreter/default/thirdparty/resourcecustomizations/kubeflow.org/v1/PyTorchJob/testdata/status-file.yaml diff --git a/pkg/resourceinterpreter/default/thirdparty/resourcecustomizations/kubeflow.org/v1/PyTorchJob/customizations.yaml b/pkg/resourceinterpreter/default/thirdparty/resourcecustomizations/kubeflow.org/v1/PyTorchJob/customizations.yaml new file mode 100644 index 000000000000..66f46a73b09d --- /dev/null +++ b/pkg/resourceinterpreter/default/thirdparty/resourcecustomizations/kubeflow.org/v1/PyTorchJob/customizations.yaml @@ -0,0 +1,269 @@ +apiVersion: config.karmada.io/v1alpha1 +kind: ResourceInterpreterCustomization +metadata: + name: declarative-configuration-pytorchjob +spec: + target: + apiVersion: kubeflow.org/v1 + kind: PyTorchJob + customizations: + componentResource: + luaScript: | + local kube = require("kube") + + -- Safe fetch of deeply nested table fields. + local function get(obj, path) + local cur = obj + for i = 1, #path do + if cur == nil then + return nil + end + cur = cur[path[i]] + end + return cur + end + + -- Normalize possibly-string numbers with a default. + local function to_num(v, default) + if v == nil or v == '' then + return default + end + local n = tonumber(v) + if n ~= nil then + return n + end + return default + end + + function GetComponents(observedObj) + local components = {} + + -- Master Component + local master_spec = get(observedObj, {"spec", "pytorchReplicaSpecs", "Master"}) + if master_spec ~= nil then + local master_replicas = to_num(master_spec.replicas, 1) + local master_template = master_spec.template + + local master_requires = {} + if master_template ~= nil then + master_requires = kube.accuratePodRequirements(master_template) + end + + local masterComponent = { + name = "master", + replicas = master_replicas, + replicaRequirements = master_requires + } + table.insert(components, masterComponent) + end + + -- Worker Component + local worker_spec = get(observedObj, {"spec", "pytorchReplicaSpecs", "Worker"}) + if worker_spec ~= nil then + local worker_replicas = to_num(worker_spec.replicas, 1) + local worker_template = worker_spec.template + + local worker_requires = {} + if worker_template ~= nil then + worker_requires = kube.accuratePodRequirements(worker_template) + end + + local workerComponent = { + name = "worker", + replicas = worker_replicas, + replicaRequirements = worker_requires + } + table.insert(components, workerComponent) + end + + return components + end + + statusAggregation: + luaScript: > + local function omitEmpty(t) + if t == nil then return nil end + local out = {} + for k, v in pairs(t) do + if type(v) == "table" then + local inner = omitEmpty(v) + if inner ~= nil and next(inner) ~= nil then + out[k] = inner + end + elseif v ~= nil and not (v == 0 or v == "" or v == "0s") then + out[k] = v + end + end + if next(out) ~= nil then + return out + else + return nil + end + end + function AggregateStatus(desiredObj, statusItems) + if desiredObj.status == nil then + desiredObj.status = {} + end + + -- If no member cluster status, initialize default status + if statusItems == nil then + desiredObj.status.startTime = nil + desiredObj.status.completionTime = nil + desiredObj.status.replicaStatuses = {} + desiredObj.status.conditions = {} + return desiredObj + end + + local startTime = nil + local completionTime = nil + local lastReconcileTime = nil + local replicaStatuses = {} + local aggregatedConditions = {} + local successfulClustersNum = 0 + local failedClusters = {} + + -- Initialize Master and Worker status + replicaStatuses.Master = { active = 0, failed = 0, succeeded = 0 } + replicaStatuses.Worker = { active = 0, failed = 0, succeeded = 0 } + + for i = 1, #statusItems do + if statusItems[i].status ~= nil then + -- Aggregate time fields (earliest start time, latest completion time and reconcile time) + if statusItems[i].status.startTime ~= nil then + if startTime == nil or statusItems[i].status.startTime < startTime then + startTime = statusItems[i].status.startTime + end + end + + if statusItems[i].status.completionTime ~= nil then + if completionTime == nil or statusItems[i].status.completionTime > completionTime then + completionTime = statusItems[i].status.completionTime + end + end + + if statusItems[i].status.lastReconcileTime ~= nil then + if lastReconcileTime == nil or statusItems[i].status.lastReconcileTime > lastReconcileTime then + lastReconcileTime = statusItems[i].status.lastReconcileTime + end + end + + -- Aggregate replica status + if statusItems[i].status.replicaStatuses ~= nil then + if statusItems[i].status.replicaStatuses.Master ~= nil then + replicaStatuses.Master.active = replicaStatuses.Master.active + (statusItems[i].status.replicaStatuses.Master.active or 0) + replicaStatuses.Master.failed = replicaStatuses.Master.failed + (statusItems[i].status.replicaStatuses.Master.failed or 0) + replicaStatuses.Master.succeeded = replicaStatuses.Master.succeeded + (statusItems[i].status.replicaStatuses.Master.succeeded or 0) + end + + if statusItems[i].status.replicaStatuses.Worker ~= nil then + replicaStatuses.Worker.active = replicaStatuses.Worker.active + (statusItems[i].status.replicaStatuses.Worker.active or 0) + replicaStatuses.Worker.failed = replicaStatuses.Worker.failed + (statusItems[i].status.replicaStatuses.Worker.failed or 0) + replicaStatuses.Worker.succeeded = replicaStatuses.Worker.succeeded + (statusItems[i].status.replicaStatuses.Worker.succeeded or 0) + end + end + + -- Aggregate condition status (merge conditions from all member clusters) + local isFinished = false + local finishedType = "" + if statusItems[i].status.conditions ~= nil then + for _, c in ipairs(statusItems[i].status.conditions) do + -- Like kubernetes native Job, we do not merge conditions from member clusters, + -- but generate a new condition by PytorchJob finish state. + -- table.insert(aggregatedConditions, c) + if (c.type == "Succeeded" or c.type == "Failed") and c.status == "True" then + isFinished = true + finishedType = c.type + end + end + end + + if isFinished then + if finishedType == "Succeeded" then + successfulClustersNum = successfulClustersNum + 1 + elseif finishedType == "Failed" then + table.insert(failedClusters, statusItems[i].clusterName) + end + end + + end + end + + if #failedClusters > 0 then + table.insert(aggregatedConditions, { + type = "Failed", + status = "True", + lastProbeTime = os.date("!%Y-%m-%dT%H:%M:%SZ"), + lastTransitionTime = os.date("!%Y-%m-%dT%H:%M:%SZ"), + reason = "PyTorchJobFailed", + message = "PyTorchJob executed failed in member clusters: " .. table.concat(failedClusters, ", ") + }) + end + + if successfulClustersNum == #statusItems and successfulClustersNum > 0 then + table.insert(aggregatedConditions, { + type = "Succeeded", + status = "True", + lastProbeTime = os.date("!%Y-%m-%dT%H:%M:%SZ"), + lastTransitionTime = os.date("!%Y-%m-%dT%H:%M:%SZ"), + reason = "Completed", + message = "PyTorchJob completed successfully" + }) + desiredObj.status.completionTime = completionTime + end + + -- Set aggregated status + desiredObj.status.startTime = startTime + desiredObj.status.lastReconcileTime = lastReconcileTime + desiredObj.status.replicaStatuses = replicaStatuses + desiredObj.status.conditions = aggregatedConditions + local tmp = desiredObj.status + desiredObj.status = omitEmpty(tmp) + + return desiredObj + end + + healthInterpretation: + luaScript: > + function InterpretHealth(observedObj) + if observedObj == nil or + observedObj.status == nil or + observedObj.status.conditions == nil then + return false + end + + -- Determine health based on PyTorchJob status + for i = 1, #observedObj.status.conditions do + local condition = observedObj.status.conditions[i] + if condition.type == "Failed" and condition.status == "True" then + return false + end + end + return true + end + + dependencyInterpretation: + luaScript: > + local kube = require("kube") + function GetDependencies(desiredObj) + local refs = {} + + if desiredObj.spec == nil or desiredObj.spec.pytorchReplicaSpecs == nil then + return refs + end + + -- Iterate PyTorchJob replica types + local replicaTypes = {"Master", "Worker"} + for _, replicaType in ipairs(replicaTypes) do + local spec = desiredObj.spec.pytorchReplicaSpecs[replicaType] + if spec ~= nil and spec.template ~= nil then + local deps = kube.getPodDependencies(spec.template, desiredObj.metadata.namespace) + if deps ~= nil then + for _, dep in ipairs(deps) do + table.insert(refs, dep) + end + end + end + end + + return refs + end diff --git a/pkg/resourceinterpreter/default/thirdparty/resourcecustomizations/kubeflow.org/v1/PyTorchJob/customizations_tests.yaml b/pkg/resourceinterpreter/default/thirdparty/resourcecustomizations/kubeflow.org/v1/PyTorchJob/customizations_tests.yaml new file mode 100644 index 000000000000..f9a3dfa8ba6b --- /dev/null +++ b/pkg/resourceinterpreter/default/thirdparty/resourcecustomizations/kubeflow.org/v1/PyTorchJob/customizations_tests.yaml @@ -0,0 +1,10 @@ +tests: + - desiredInputPath: testdata/desired-pytorchjob.yaml + statusInputPath: testdata/status-file.yaml + operation: AggregateStatus + - observedInputPath: testdata/observed-pytorchjob.yaml + operation: InterpretHealth + - observedInputPath: testdata/observed-pytorchjob.yaml + operation: InterpretComponent + - desiredInputPath: testdata/desired-pytorchjob.yaml + operation: InterpretDependency diff --git a/pkg/resourceinterpreter/default/thirdparty/resourcecustomizations/kubeflow.org/v1/PyTorchJob/testdata/desired-pytorchjob.yaml b/pkg/resourceinterpreter/default/thirdparty/resourcecustomizations/kubeflow.org/v1/PyTorchJob/testdata/desired-pytorchjob.yaml new file mode 100644 index 000000000000..ec9ecc3c522e --- /dev/null +++ b/pkg/resourceinterpreter/default/thirdparty/resourcecustomizations/kubeflow.org/v1/PyTorchJob/testdata/desired-pytorchjob.yaml @@ -0,0 +1,49 @@ +apiVersion: kubeflow.org/v1 +kind: PyTorchJob +metadata: + name: pytorch-simple + namespace: kubeflow +spec: + pytorchReplicaSpecs: + Master: + replicas: 2 + restartPolicy: OnFailure + template: + spec: + containers: + - name: pytorch + image: docker.io/kubeflowkatib/pytorch-mnist:v1beta1-45c5727 + imagePullPolicy: Always + command: + - "python3" + - "/opt/pytorch-mnist/mnist.py" + - "--epochs=1" + resources: + limits: + cpu: 1 + memory: 512Mi + Worker: + replicas: 2 + restartPolicy: OnFailure + template: + spec: + serviceAccountName: pytorch-service-account + containers: + - name: pytorch + image: docker.io/kubeflowkatib/pytorch-mnist:v1beta1-45c5727 + imagePullPolicy: Always + volumeMounts: + - mountPath: "/train" + name: "training" + command: + - "python3" + - "/opt/pytorch-mnist/mnist.py" + - "--epochs=1" + resources: + limits: + cpu: 1 + memory: 512Mi + volumes: + - name: "training" + persistentVolumeClaim: + claimName: "training-data" diff --git a/pkg/resourceinterpreter/default/thirdparty/resourcecustomizations/kubeflow.org/v1/PyTorchJob/testdata/observed-pytorchjob.yaml b/pkg/resourceinterpreter/default/thirdparty/resourcecustomizations/kubeflow.org/v1/PyTorchJob/testdata/observed-pytorchjob.yaml new file mode 100644 index 000000000000..06ac257b60ab --- /dev/null +++ b/pkg/resourceinterpreter/default/thirdparty/resourcecustomizations/kubeflow.org/v1/PyTorchJob/testdata/observed-pytorchjob.yaml @@ -0,0 +1,78 @@ +apiVersion: kubeflow.org/v1 +kind: PyTorchJob +metadata: + name: pytorch-simple + namespace: kubeflow +spec: + pytorchReplicaSpecs: + Master: + replicas: 2 + restartPolicy: OnFailure + template: + spec: + containers: + - name: pytorch + image: docker.io/kubeflowkatib/pytorch-mnist:v1beta1-45c5727 + imagePullPolicy: Always + command: + - "python3" + - "/opt/pytorch-mnist/mnist.py" + - "--epochs=1" + resources: + limits: + cpu: 1 + memory: 512Mi + Worker: + replicas: 2 + restartPolicy: OnFailure + template: + spec: + serviceAccountName: pytorch-service-account + containers: + - name: pytorch + image: docker.io/kubeflowkatib/pytorch-mnist:v1beta1-45c5727 + imagePullPolicy: Always + volumeMounts: + - mountPath: "/train" + name: "training" + command: + - "python3" + - "/opt/pytorch-mnist/mnist.py" + - "--epochs=1" + resources: + limits: + cpu: 1 + memory: 512Mi + volumes: + - name: "training" + persistentVolumeClaim: + claimName: "training-data" +status: + completionTime: "2025-10-13T11:48:12Z" + conditions: + - lastTransitionTime: "2025-10-13T11:46:42Z" + lastUpdateTime: "2025-10-13T11:46:42Z" + message: PyTorchJob pytorch-simple is created. + reason: PyTorchJobCreated + status: "True" + type: Created + - lastTransitionTime: "2025-10-13T11:47:40Z" + lastUpdateTime: "2025-10-13T11:47:40Z" + message: PyTorchJob pytorch-simple is running. + reason: PyTorchJobRunning + status: "False" + type: Running + - lastTransitionTime: "2025-10-13T11:48:12Z" + lastUpdateTime: "2025-10-13T11:48:12Z" + message: PyTorchJob pytorch-simple is successfully completed. + reason: PyTorchJobSucceeded + status: "True" + type: Succeeded + replicaStatuses: + Master: + selector: training.kubeflow.org/job-name=pytorch-simple,training.kubeflow.org/operator-name=pytorchjob-controller,training.kubeflow.org/replica-type=master + succeeded: 1 + Worker: + selector: training.kubeflow.org/job-name=pytorch-simple,training.kubeflow.org/operator-name=pytorchjob-controller,training.kubeflow.org/replica-type=worker + succeeded: 1 + startTime: "2025-10-13T11:46:42Z" diff --git a/pkg/resourceinterpreter/default/thirdparty/resourcecustomizations/kubeflow.org/v1/PyTorchJob/testdata/status-file.yaml b/pkg/resourceinterpreter/default/thirdparty/resourcecustomizations/kubeflow.org/v1/PyTorchJob/testdata/status-file.yaml new file mode 100644 index 000000000000..93aac9b01482 --- /dev/null +++ b/pkg/resourceinterpreter/default/thirdparty/resourcecustomizations/kubeflow.org/v1/PyTorchJob/testdata/status-file.yaml @@ -0,0 +1,65 @@ +applied: true +clusterName: member1 +health: Healthy +status: + completionTime: "2025-10-13T11:48:12Z" + conditions: + - lastTransitionTime: "2025-10-13T11:46:42Z" + lastUpdateTime: "2025-10-13T11:46:42Z" + message: PyTorchJob pytorch-simple is created. + reason: PyTorchJobCreated + status: "True" + type: Created + - lastTransitionTime: "2025-10-13T11:47:40Z" + lastUpdateTime: "2025-10-13T11:47:40Z" + message: PyTorchJob pytorch-simple is running. + reason: PyTorchJobRunning + status: "False" + type: Running + - lastTransitionTime: "2025-10-13T11:48:12Z" + lastUpdateTime: "2025-10-13T11:48:12Z" + message: PyTorchJob pytorch-simple is successfully completed. + reason: PyTorchJobSucceeded + status: "True" + type: Succeeded + replicaStatuses: + Master: + selector: training.kubeflow.org/job-name=pytorch-simple,training.kubeflow.org/operator-name=pytorchjob-controller,training.kubeflow.org/replica-type=master + succeeded: 1 + Worker: + selector: training.kubeflow.org/job-name=pytorch-simple,training.kubeflow.org/operator-name=pytorchjob-controller,training.kubeflow.org/replica-type=worker + succeeded: 1 + startTime: "2025-10-13T11:46:42Z" +--- +applied: true +clusterName: member2 +health: Healthy +status: + completionTime: "2025-10-13T11:48:12Z" + conditions: + - lastTransitionTime: "2025-10-13T11:46:42Z" + lastUpdateTime: "2025-10-13T11:46:42Z" + message: PyTorchJob pytorch-simple is created. + reason: PyTorchJobCreated + status: "True" + type: Created + - lastTransitionTime: "2025-10-13T11:47:40Z" + lastUpdateTime: "2025-10-13T11:47:40Z" + message: PyTorchJob pytorch-simple is running. + reason: PyTorchJobRunning + status: "False" + type: Running + - lastTransitionTime: "2025-10-13T11:48:12Z" + lastUpdateTime: "2025-10-13T11:48:12Z" + message: PyTorchJob pytorch-simple is successfully completed. + reason: PyTorchJobSucceeded + status: "True" + type: Succeeded + replicaStatuses: + Master: + selector: training.kubeflow.org/job-name=pytorch-simple,training.kubeflow.org/operator-name=pytorchjob-controller,training.kubeflow.org/replica-type=master + succeeded: 1 + Worker: + selector: training.kubeflow.org/job-name=pytorch-simple,training.kubeflow.org/operator-name=pytorchjob-controller,training.kubeflow.org/replica-type=worker + succeeded: 1 + startTime: "2025-10-13T11:46:42Z"