Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions pkg/controller/mpi_job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -851,7 +851,7 @@ func (c *MPIJobController) getOrCreateConfigMap(mpiJob *kubeflow.MPIJob) (*corev
if err != nil {
return nil, err
}
updateDiscoverHostsInConfigMap(newCM, mpiJob, podList)
updateDiscoverHostsInConfigMap(newCM, mpiJob, podList, c.clusterDomain)

cm, err := c.configMapLister.ConfigMaps(mpiJob.Namespace).Get(mpiJob.Name + configSuffix)
// If the ConfigMap doesn't exist, we'll create it.
Expand Down Expand Up @@ -1333,23 +1333,27 @@ func newConfigMap(mpiJob *kubeflow.MPIJob, workerReplicas int32, clusterDomain s
}

// updateDiscoverHostsInConfigMap updates the ConfigMap if the content of `discover_hosts.sh` changes.
func updateDiscoverHostsInConfigMap(configMap *corev1.ConfigMap, mpiJob *kubeflow.MPIJob, runningPods []*corev1.Pod) {
func updateDiscoverHostsInConfigMap(configMap *corev1.ConfigMap, mpiJob *kubeflow.MPIJob, runningPods []*corev1.Pod, clusterDomain string) {
// Sort the slice of Pods to make sure the order of entries in `discover_hosts.sh` is maintained.
sort.Slice(runningPods, func(i, j int) bool {
return runningPods[i].Name < runningPods[j].Name
})

var buffer bytes.Buffer
buffer.WriteString("#!/bin/sh\n")
domainFormat := "%s.%s.%s.svc"
if len(clusterDomain) > 0 {
domainFormat += fmt.Sprintf(".%s", clusterDomain)
}

// We don't check if launcher is running here, launcher should always be there or the job failed
if runLauncherAsWorker(mpiJob) {
name := mpiJob.Name + launcherSuffix
buffer.WriteString(fmt.Sprintf("echo %s.%s.%s.svc\n", name, mpiJob.Name, mpiJob.Namespace))
buffer.WriteString(fmt.Sprintf("echo %s\n", fmt.Sprintf(domainFormat, name, mpiJob.Name, mpiJob.Namespace)))
}

for _, p := range runningPods {
buffer.WriteString(fmt.Sprintf("echo %s.%s.%s.svc\n", p.Name, mpiJob.Name, p.Namespace))
buffer.WriteString(fmt.Sprintf("echo %s\n", fmt.Sprintf(domainFormat, p.Name, mpiJob.Name, p.Namespace)))
}

configMap.Data[discoverHostsScriptName] = buffer.String()
Expand Down
263 changes: 252 additions & 11 deletions pkg/controller/mpi_job_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -546,7 +546,7 @@ func TestAllResourcesCreated(t *testing.T) {
scheme.Scheme.Default(mpiJobCopy)
f.expectCreateServiceAction(newJobService(mpiJobCopy))
cfgMap := newConfigMap(mpiJobCopy, 5, "")
updateDiscoverHostsInConfigMap(cfgMap, mpiJob, nil)
updateDiscoverHostsInConfigMap(cfgMap, mpiJob, nil, "")
f.expectCreateConfigMapAction(cfgMap)
secret, err := newSSHAuthSecret(mpiJobCopy)
if err != nil {
Expand Down Expand Up @@ -709,7 +709,7 @@ func TestConfigMapNotControlledByUs(t *testing.T) {
f.setUpService(newJobService(mpiJob))

configMap := newConfigMap(mpiJob, replicas, "")
updateDiscoverHostsInConfigMap(configMap, mpiJob, nil)
updateDiscoverHostsInConfigMap(configMap, mpiJob, nil, "")
configMap.OwnerReferences = nil
f.setUpConfigMap(configMap)

Expand Down Expand Up @@ -755,7 +755,7 @@ func TestLauncherServiceNotControlledByUs(t *testing.T) {
t.Fatalf("Creating SSH auth Secret: %v", err)
}
f.setUpSecret(secret)
updateDiscoverHostsInConfigMap(configMap, mpiJobCopy, nil)
updateDiscoverHostsInConfigMap(configMap, mpiJobCopy, nil, "")
f.setUpConfigMap(configMap)
fmjc := f.newFakeMPIJobController()
for i := 0; i < int(replicas); i++ {
Expand All @@ -778,7 +778,7 @@ func TestSecretNotControlledByUs(t *testing.T) {
mpiJobCopy := mpiJob.DeepCopy()
scheme.Scheme.Default(mpiJobCopy)
configMap := newConfigMap(mpiJobCopy, replicas, "")
updateDiscoverHostsInConfigMap(configMap, mpiJobCopy, nil)
updateDiscoverHostsInConfigMap(configMap, mpiJobCopy, nil, "")
f.setUpConfigMap(configMap)
f.setUpService(newJobService(mpiJobCopy))

Expand Down Expand Up @@ -861,7 +861,7 @@ func TestCreateSuspendedMPIJob(t *testing.T) {
scheme.Scheme.Default(mpiJob)
f.expectCreateServiceAction(newJobService(mpiJob))
cfgMap := newConfigMap(mpiJob, replicas, "")
updateDiscoverHostsInConfigMap(cfgMap, mpiJob, nil)
updateDiscoverHostsInConfigMap(cfgMap, mpiJob, nil, "")
f.expectCreateConfigMapAction(cfgMap)
secret, err := newSSHAuthSecret(mpiJob)
if err != nil {
Expand Down Expand Up @@ -932,7 +932,7 @@ func TestSuspendedRunningMPIJob(t *testing.T) {
f.setUpService(newJobService(mpiJob))

cfgMap := newConfigMap(mpiJob, replicas, "")
updateDiscoverHostsInConfigMap(cfgMap, mpiJob, runningPodList)
updateDiscoverHostsInConfigMap(cfgMap, mpiJob, runningPodList, "")
f.setUpConfigMap(cfgMap)
secret, err := newSSHAuthSecret(mpiJob)
if err != nil {
Expand Down Expand Up @@ -1004,7 +1004,7 @@ func TestResumeMPIJob(t *testing.T) {
scheme.Scheme.Default(mpiJob)
f.expectCreateServiceAction(newJobService(mpiJob))
cfgMap := newConfigMap(mpiJob, replicas, "")
updateDiscoverHostsInConfigMap(cfgMap, mpiJob, nil)
updateDiscoverHostsInConfigMap(cfgMap, mpiJob, nil, "")
f.setUpConfigMap(cfgMap)
secret, err := newSSHAuthSecret(mpiJob)
if err != nil {
Expand Down Expand Up @@ -1056,7 +1056,7 @@ func TestWorkerNotControlledByUs(t *testing.T) {
mpiJobCopy := mpiJob.DeepCopy()
scheme.Scheme.Default(mpiJobCopy)
configMap := newConfigMap(mpiJobCopy, replicas, "")
updateDiscoverHostsInConfigMap(configMap, mpiJobCopy, nil)
updateDiscoverHostsInConfigMap(configMap, mpiJobCopy, nil, "")
f.setUpConfigMap(configMap)
f.setUpService(newJobService(mpiJobCopy))
secret, err := newSSHAuthSecret(mpiJobCopy)
Expand Down Expand Up @@ -1087,7 +1087,7 @@ func TestLauncherActiveWorkerNotReady(t *testing.T) {
mpiJobCopy := mpiJob.DeepCopy()
scheme.Scheme.Default(mpiJobCopy)
configMap := newConfigMap(mpiJobCopy, replicas, "")
updateDiscoverHostsInConfigMap(configMap, mpiJobCopy, nil)
updateDiscoverHostsInConfigMap(configMap, mpiJobCopy, nil, "")
f.setUpConfigMap(configMap)
f.setUpService(newJobService(mpiJobCopy))
secret, err := newSSHAuthSecret(mpiJobCopy)
Expand Down Expand Up @@ -1162,7 +1162,7 @@ func TestLauncherActiveWorkerReady(t *testing.T) {
}

configMap := newConfigMap(mpiJobCopy, replicas, "")
updateDiscoverHostsInConfigMap(configMap, mpiJobCopy, runningPodList)
updateDiscoverHostsInConfigMap(configMap, mpiJobCopy, runningPodList, "")
f.setUpConfigMap(configMap)

mpiJobCopy.Status.ReplicaStatuses = map[kubeflow.MPIReplicaType]*kubeflow.ReplicaStatus{
Expand Down Expand Up @@ -1216,7 +1216,7 @@ func TestWorkerReady(t *testing.T) {
}

configMap := newConfigMap(mpiJobCopy, replicas, "")
updateDiscoverHostsInConfigMap(configMap, mpiJobCopy, runningPodList)
updateDiscoverHostsInConfigMap(configMap, mpiJobCopy, runningPodList, "")
f.setUpConfigMap(configMap)

expLauncher := fmjc.newLauncherJob(mpiJobCopy)
Expand Down Expand Up @@ -1981,6 +1981,247 @@ func TestNewConfigMap(t *testing.T) {
}
}

func TestUpdateDiscoverHostsInConfigMap(t *testing.T) {
testCases := map[string]struct {
mpiJob *kubeflow.MPIJob
runningPods []*corev1.Pod
clusterDomain string
wantConfigMap *corev1.ConfigMap
}{
"no cluster domain, launcher as worker disabled, no running pods": {
mpiJob: &kubeflow.MPIJob{
ObjectMeta: metav1.ObjectMeta{
Name: "test-job",
Namespace: "default",
},
Spec: kubeflow.MPIJobSpec{
MPIImplementation: kubeflow.MPIImplementationOpenMPI,
},
},
runningPods: []*corev1.Pod{},
clusterDomain: "",
wantConfigMap: &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "test-job-config",
Namespace: "default",
},
Data: map[string]string{
"discover_hosts.sh": "#!/bin/sh\n",
},
},
},
"no cluster domain, launcher as worker disabled, with running pods": {
mpiJob: &kubeflow.MPIJob{
ObjectMeta: metav1.ObjectMeta{
Name: "test-job",
Namespace: "default",
},
Spec: kubeflow.MPIJobSpec{
MPIImplementation: kubeflow.MPIImplementationOpenMPI,
},
},
runningPods: []*corev1.Pod{
{
ObjectMeta: metav1.ObjectMeta{
Name: "test-job-worker-0",
Namespace: "default",
},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "test-job-worker-1",
Namespace: "default",
},
},
},
clusterDomain: "",
wantConfigMap: &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "test-job-config",
Namespace: "default",
},
Data: map[string]string{
"discover_hosts.sh": "#!/bin/sh\necho test-job-worker-0.test-job.default.svc\necho test-job-worker-1.test-job.default.svc\n",
},
},
},
"no cluster domain, launcher as worker enabled, no running pods": {
mpiJob: &kubeflow.MPIJob{
ObjectMeta: metav1.ObjectMeta{
Name: "test-job",
Namespace: "default",
},
Spec: kubeflow.MPIJobSpec{
MPIImplementation: kubeflow.MPIImplementationOpenMPI,
RunLauncherAsWorker: ptr.To(true),
},
},
runningPods: []*corev1.Pod{},
clusterDomain: "",
wantConfigMap: &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "test-job-config",
Namespace: "default",
},
Data: map[string]string{
"discover_hosts.sh": "#!/bin/sh\necho test-job-launcher.test-job.default.svc\n",
},
},
},
"no cluster domain, launcher as worker enabled, with running pods": {
mpiJob: &kubeflow.MPIJob{
ObjectMeta: metav1.ObjectMeta{
Name: "test-job",
Namespace: "default",
},
Spec: kubeflow.MPIJobSpec{
MPIImplementation: kubeflow.MPIImplementationOpenMPI,
RunLauncherAsWorker: ptr.To(true),
},
},
runningPods: []*corev1.Pod{
{
ObjectMeta: metav1.ObjectMeta{
Name: "test-job-worker-0",
Namespace: "default",
},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "test-job-worker-1",
Namespace: "default",
},
},
},
clusterDomain: "",
wantConfigMap: &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "test-job-config",
Namespace: "default",
},
Data: map[string]string{
"discover_hosts.sh": "#!/bin/sh\necho test-job-launcher.test-job.default.svc\necho test-job-worker-0.test-job.default.svc\necho test-job-worker-1.test-job.default.svc\n",
},
},
},
"with cluster domain, launcher as worker disabled, with running pods": {
mpiJob: &kubeflow.MPIJob{
ObjectMeta: metav1.ObjectMeta{
Name: "test-job",
Namespace: "tenant-a",
},
Spec: kubeflow.MPIJobSpec{
MPIImplementation: kubeflow.MPIImplementationOpenMPI,
},
},
runningPods: []*corev1.Pod{
{
ObjectMeta: metav1.ObjectMeta{
Name: "test-job-worker-0",
Namespace: "tenant-a",
},
},
},
clusterDomain: "cluster.local",
wantConfigMap: &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "test-job-config",
Namespace: "tenant-a",
},
Data: map[string]string{
"discover_hosts.sh": "#!/bin/sh\necho test-job-worker-0.test-job.tenant-a.svc.cluster.local\n",
},
},
},
"with cluster domain, launcher as worker enabled, with running pods": {
mpiJob: &kubeflow.MPIJob{
ObjectMeta: metav1.ObjectMeta{
Name: "test-job",
Namespace: "tenant-a",
},
Spec: kubeflow.MPIJobSpec{
MPIImplementation: kubeflow.MPIImplementationOpenMPI,
RunLauncherAsWorker: ptr.To(true),
},
},
runningPods: []*corev1.Pod{
{
ObjectMeta: metav1.ObjectMeta{
Name: "test-job-worker-0",
Namespace: "tenant-a",
},
},
},
clusterDomain: "cluster.local",
wantConfigMap: &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "test-job-config",
Namespace: "tenant-a",
},
Data: map[string]string{
"discover_hosts.sh": "#!/bin/sh\necho test-job-launcher.test-job.tenant-a.svc.cluster.local\necho test-job-worker-0.test-job.tenant-a.svc.cluster.local\n",
},
},
},
"pods are sorted by name": {
mpiJob: &kubeflow.MPIJob{
ObjectMeta: metav1.ObjectMeta{
Name: "test-job",
Namespace: "default",
},
Spec: kubeflow.MPIJobSpec{
MPIImplementation: kubeflow.MPIImplementationOpenMPI,
},
},
runningPods: []*corev1.Pod{
{
ObjectMeta: metav1.ObjectMeta{
Name: "test-job-worker-2",
Namespace: "default",
},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "test-job-worker-0",
Namespace: "default",
},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "test-job-worker-1",
Namespace: "default",
},
},
},
clusterDomain: "",
wantConfigMap: &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "test-job-config",
Namespace: "default",
},
Data: map[string]string{
"discover_hosts.sh": "#!/bin/sh\necho test-job-worker-0.test-job.default.svc\necho test-job-worker-1.test-job.default.svc\necho test-job-worker-2.test-job.default.svc\n",
},
},
},
}
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
configMap := &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: tc.mpiJob.Name + "-config",
Namespace: tc.mpiJob.Namespace,
},
Data: make(map[string]string),
}
updateDiscoverHostsInConfigMap(configMap, tc.mpiJob, tc.runningPods, tc.clusterDomain)
if diff := cmp.Diff(tc.wantConfigMap, configMap); len(diff) != 0 {
t.Errorf("Unexpected ConfigMap (-want,+got):\n%s", diff)
}
})
}
}

func joinEnvVars(evs ...interface{}) []corev1.EnvVar {
var result []corev1.EnvVar
for _, ev := range evs {
Expand Down
Loading