Skip to content

Commit d1f23a4

Browse files
committed
Implement clusterDomain propagation to discover file
Signed-off-by: Yuki Iwai <[email protected]>
1 parent d0e359f commit d1f23a4

File tree

2 files changed

+260
-15
lines changed

2 files changed

+260
-15
lines changed

pkg/controller/mpi_job_controller.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -851,7 +851,7 @@ func (c *MPIJobController) getOrCreateConfigMap(mpiJob *kubeflow.MPIJob) (*corev
851851
if err != nil {
852852
return nil, err
853853
}
854-
updateDiscoverHostsInConfigMap(newCM, mpiJob, podList)
854+
updateDiscoverHostsInConfigMap(newCM, mpiJob, podList, c.clusterDomain)
855855

856856
cm, err := c.configMapLister.ConfigMaps(mpiJob.Namespace).Get(mpiJob.Name + configSuffix)
857857
// If the ConfigMap doesn't exist, we'll create it.
@@ -1333,23 +1333,27 @@ func newConfigMap(mpiJob *kubeflow.MPIJob, workerReplicas int32, clusterDomain s
13331333
}
13341334

13351335
// updateDiscoverHostsInConfigMap updates the ConfigMap if the content of `discover_hosts.sh` changes.
1336-
func updateDiscoverHostsInConfigMap(configMap *corev1.ConfigMap, mpiJob *kubeflow.MPIJob, runningPods []*corev1.Pod) {
1336+
func updateDiscoverHostsInConfigMap(configMap *corev1.ConfigMap, mpiJob *kubeflow.MPIJob, runningPods []*corev1.Pod, clusterDomain string) {
13371337
// Sort the slice of Pods to make sure the order of entries in `discover_hosts.sh` is maintained.
13381338
sort.Slice(runningPods, func(i, j int) bool {
13391339
return runningPods[i].Name < runningPods[j].Name
13401340
})
13411341

13421342
var buffer bytes.Buffer
13431343
buffer.WriteString("#!/bin/sh\n")
1344+
domainFormat := "%s.%s.%s.svc"
1345+
if len(clusterDomain) > 0 {
1346+
domainFormat += fmt.Sprintf(".%s", clusterDomain)
1347+
}
13441348

13451349
// We don't check if launcher is running here, launcher should always be there or the job failed
13461350
if runLauncherAsWorker(mpiJob) {
13471351
name := mpiJob.Name + launcherSuffix
1348-
buffer.WriteString(fmt.Sprintf("echo %s.%s.%s.svc\n", name, mpiJob.Name, mpiJob.Namespace))
1352+
buffer.WriteString(fmt.Sprintf("echo %s\n", fmt.Sprintf(domainFormat, name, mpiJob.Name, mpiJob.Namespace)))
13491353
}
13501354

13511355
for _, p := range runningPods {
1352-
buffer.WriteString(fmt.Sprintf("echo %s.%s.%s.svc\n", p.Name, mpiJob.Name, p.Namespace))
1356+
buffer.WriteString(fmt.Sprintf("echo %s\n", fmt.Sprintf(domainFormat, p.Name, mpiJob.Name, p.Namespace)))
13531357
}
13541358

13551359
configMap.Data[discoverHostsScriptName] = buffer.String()

pkg/controller/mpi_job_controller_test.go

Lines changed: 252 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -546,7 +546,7 @@ func TestAllResourcesCreated(t *testing.T) {
546546
scheme.Scheme.Default(mpiJobCopy)
547547
f.expectCreateServiceAction(newJobService(mpiJobCopy))
548548
cfgMap := newConfigMap(mpiJobCopy, 5, "")
549-
updateDiscoverHostsInConfigMap(cfgMap, mpiJob, nil)
549+
updateDiscoverHostsInConfigMap(cfgMap, mpiJob, nil, "")
550550
f.expectCreateConfigMapAction(cfgMap)
551551
secret, err := newSSHAuthSecret(mpiJobCopy)
552552
if err != nil {
@@ -709,7 +709,7 @@ func TestConfigMapNotControlledByUs(t *testing.T) {
709709
f.setUpService(newJobService(mpiJob))
710710

711711
configMap := newConfigMap(mpiJob, replicas, "")
712-
updateDiscoverHostsInConfigMap(configMap, mpiJob, nil)
712+
updateDiscoverHostsInConfigMap(configMap, mpiJob, nil, "")
713713
configMap.OwnerReferences = nil
714714
f.setUpConfigMap(configMap)
715715

@@ -755,7 +755,7 @@ func TestLauncherServiceNotControlledByUs(t *testing.T) {
755755
t.Fatalf("Creating SSH auth Secret: %v", err)
756756
}
757757
f.setUpSecret(secret)
758-
updateDiscoverHostsInConfigMap(configMap, mpiJobCopy, nil)
758+
updateDiscoverHostsInConfigMap(configMap, mpiJobCopy, nil, "")
759759
f.setUpConfigMap(configMap)
760760
fmjc := f.newFakeMPIJobController()
761761
for i := 0; i < int(replicas); i++ {
@@ -778,7 +778,7 @@ func TestSecretNotControlledByUs(t *testing.T) {
778778
mpiJobCopy := mpiJob.DeepCopy()
779779
scheme.Scheme.Default(mpiJobCopy)
780780
configMap := newConfigMap(mpiJobCopy, replicas, "")
781-
updateDiscoverHostsInConfigMap(configMap, mpiJobCopy, nil)
781+
updateDiscoverHostsInConfigMap(configMap, mpiJobCopy, nil, "")
782782
f.setUpConfigMap(configMap)
783783
f.setUpService(newJobService(mpiJobCopy))
784784

@@ -861,7 +861,7 @@ func TestCreateSuspendedMPIJob(t *testing.T) {
861861
scheme.Scheme.Default(mpiJob)
862862
f.expectCreateServiceAction(newJobService(mpiJob))
863863
cfgMap := newConfigMap(mpiJob, replicas, "")
864-
updateDiscoverHostsInConfigMap(cfgMap, mpiJob, nil)
864+
updateDiscoverHostsInConfigMap(cfgMap, mpiJob, nil, "")
865865
f.expectCreateConfigMapAction(cfgMap)
866866
secret, err := newSSHAuthSecret(mpiJob)
867867
if err != nil {
@@ -932,7 +932,7 @@ func TestSuspendedRunningMPIJob(t *testing.T) {
932932
f.setUpService(newJobService(mpiJob))
933933

934934
cfgMap := newConfigMap(mpiJob, replicas, "")
935-
updateDiscoverHostsInConfigMap(cfgMap, mpiJob, runningPodList)
935+
updateDiscoverHostsInConfigMap(cfgMap, mpiJob, runningPodList, "")
936936
f.setUpConfigMap(cfgMap)
937937
secret, err := newSSHAuthSecret(mpiJob)
938938
if err != nil {
@@ -1004,7 +1004,7 @@ func TestResumeMPIJob(t *testing.T) {
10041004
scheme.Scheme.Default(mpiJob)
10051005
f.expectCreateServiceAction(newJobService(mpiJob))
10061006
cfgMap := newConfigMap(mpiJob, replicas, "")
1007-
updateDiscoverHostsInConfigMap(cfgMap, mpiJob, nil)
1007+
updateDiscoverHostsInConfigMap(cfgMap, mpiJob, nil, "")
10081008
f.setUpConfigMap(cfgMap)
10091009
secret, err := newSSHAuthSecret(mpiJob)
10101010
if err != nil {
@@ -1056,7 +1056,7 @@ func TestWorkerNotControlledByUs(t *testing.T) {
10561056
mpiJobCopy := mpiJob.DeepCopy()
10571057
scheme.Scheme.Default(mpiJobCopy)
10581058
configMap := newConfigMap(mpiJobCopy, replicas, "")
1059-
updateDiscoverHostsInConfigMap(configMap, mpiJobCopy, nil)
1059+
updateDiscoverHostsInConfigMap(configMap, mpiJobCopy, nil, "")
10601060
f.setUpConfigMap(configMap)
10611061
f.setUpService(newJobService(mpiJobCopy))
10621062
secret, err := newSSHAuthSecret(mpiJobCopy)
@@ -1087,7 +1087,7 @@ func TestLauncherActiveWorkerNotReady(t *testing.T) {
10871087
mpiJobCopy := mpiJob.DeepCopy()
10881088
scheme.Scheme.Default(mpiJobCopy)
10891089
configMap := newConfigMap(mpiJobCopy, replicas, "")
1090-
updateDiscoverHostsInConfigMap(configMap, mpiJobCopy, nil)
1090+
updateDiscoverHostsInConfigMap(configMap, mpiJobCopy, nil, "")
10911091
f.setUpConfigMap(configMap)
10921092
f.setUpService(newJobService(mpiJobCopy))
10931093
secret, err := newSSHAuthSecret(mpiJobCopy)
@@ -1162,7 +1162,7 @@ func TestLauncherActiveWorkerReady(t *testing.T) {
11621162
}
11631163

11641164
configMap := newConfigMap(mpiJobCopy, replicas, "")
1165-
updateDiscoverHostsInConfigMap(configMap, mpiJobCopy, runningPodList)
1165+
updateDiscoverHostsInConfigMap(configMap, mpiJobCopy, runningPodList, "")
11661166
f.setUpConfigMap(configMap)
11671167

11681168
mpiJobCopy.Status.ReplicaStatuses = map[kubeflow.MPIReplicaType]*kubeflow.ReplicaStatus{
@@ -1216,7 +1216,7 @@ func TestWorkerReady(t *testing.T) {
12161216
}
12171217

12181218
configMap := newConfigMap(mpiJobCopy, replicas, "")
1219-
updateDiscoverHostsInConfigMap(configMap, mpiJobCopy, runningPodList)
1219+
updateDiscoverHostsInConfigMap(configMap, mpiJobCopy, runningPodList, "")
12201220
f.setUpConfigMap(configMap)
12211221

12221222
expLauncher := fmjc.newLauncherJob(mpiJobCopy)
@@ -1981,6 +1981,247 @@ func TestNewConfigMap(t *testing.T) {
19811981
}
19821982
}
19831983

1984+
func TestUpdateDiscoverHostsInConfigMap(t *testing.T) {
1985+
testCases := map[string]struct {
1986+
mpiJob *kubeflow.MPIJob
1987+
runningPods []*corev1.Pod
1988+
clusterDomain string
1989+
wantConfigMap *corev1.ConfigMap
1990+
}{
1991+
"no cluster domain, launcher as worker disabled, no running pods": {
1992+
mpiJob: &kubeflow.MPIJob{
1993+
ObjectMeta: metav1.ObjectMeta{
1994+
Name: "test-job",
1995+
Namespace: "default",
1996+
},
1997+
Spec: kubeflow.MPIJobSpec{
1998+
MPIImplementation: kubeflow.MPIImplementationOpenMPI,
1999+
},
2000+
},
2001+
runningPods: []*corev1.Pod{},
2002+
clusterDomain: "",
2003+
wantConfigMap: &corev1.ConfigMap{
2004+
ObjectMeta: metav1.ObjectMeta{
2005+
Name: "test-job-config",
2006+
Namespace: "default",
2007+
},
2008+
Data: map[string]string{
2009+
"discover_hosts.sh": "#!/bin/sh\n",
2010+
},
2011+
},
2012+
},
2013+
"no cluster domain, launcher as worker disabled, with running pods": {
2014+
mpiJob: &kubeflow.MPIJob{
2015+
ObjectMeta: metav1.ObjectMeta{
2016+
Name: "test-job",
2017+
Namespace: "default",
2018+
},
2019+
Spec: kubeflow.MPIJobSpec{
2020+
MPIImplementation: kubeflow.MPIImplementationOpenMPI,
2021+
},
2022+
},
2023+
runningPods: []*corev1.Pod{
2024+
{
2025+
ObjectMeta: metav1.ObjectMeta{
2026+
Name: "test-job-worker-0",
2027+
Namespace: "default",
2028+
},
2029+
},
2030+
{
2031+
ObjectMeta: metav1.ObjectMeta{
2032+
Name: "test-job-worker-1",
2033+
Namespace: "default",
2034+
},
2035+
},
2036+
},
2037+
clusterDomain: "",
2038+
wantConfigMap: &corev1.ConfigMap{
2039+
ObjectMeta: metav1.ObjectMeta{
2040+
Name: "test-job-config",
2041+
Namespace: "default",
2042+
},
2043+
Data: map[string]string{
2044+
"discover_hosts.sh": "#!/bin/sh\necho test-job-worker-0.test-job.default.svc\necho test-job-worker-1.test-job.default.svc\n",
2045+
},
2046+
},
2047+
},
2048+
"no cluster domain, launcher as worker enabled, no running pods": {
2049+
mpiJob: &kubeflow.MPIJob{
2050+
ObjectMeta: metav1.ObjectMeta{
2051+
Name: "test-job",
2052+
Namespace: "default",
2053+
},
2054+
Spec: kubeflow.MPIJobSpec{
2055+
MPIImplementation: kubeflow.MPIImplementationOpenMPI,
2056+
RunLauncherAsWorker: ptr.To(true),
2057+
},
2058+
},
2059+
runningPods: []*corev1.Pod{},
2060+
clusterDomain: "",
2061+
wantConfigMap: &corev1.ConfigMap{
2062+
ObjectMeta: metav1.ObjectMeta{
2063+
Name: "test-job-config",
2064+
Namespace: "default",
2065+
},
2066+
Data: map[string]string{
2067+
"discover_hosts.sh": "#!/bin/sh\necho test-job-launcher.test-job.default.svc\n",
2068+
},
2069+
},
2070+
},
2071+
"no cluster domain, launcher as worker enabled, with running pods": {
2072+
mpiJob: &kubeflow.MPIJob{
2073+
ObjectMeta: metav1.ObjectMeta{
2074+
Name: "test-job",
2075+
Namespace: "default",
2076+
},
2077+
Spec: kubeflow.MPIJobSpec{
2078+
MPIImplementation: kubeflow.MPIImplementationOpenMPI,
2079+
RunLauncherAsWorker: ptr.To(true),
2080+
},
2081+
},
2082+
runningPods: []*corev1.Pod{
2083+
{
2084+
ObjectMeta: metav1.ObjectMeta{
2085+
Name: "test-job-worker-0",
2086+
Namespace: "default",
2087+
},
2088+
},
2089+
{
2090+
ObjectMeta: metav1.ObjectMeta{
2091+
Name: "test-job-worker-1",
2092+
Namespace: "default",
2093+
},
2094+
},
2095+
},
2096+
clusterDomain: "",
2097+
wantConfigMap: &corev1.ConfigMap{
2098+
ObjectMeta: metav1.ObjectMeta{
2099+
Name: "test-job-config",
2100+
Namespace: "default",
2101+
},
2102+
Data: map[string]string{
2103+
"discover_hosts.sh": "#!/bin/sh\necho test-job-launcher.test-job.default.svc\necho test-job-worker-0.test-job.default.svc\necho test-job-worker-1.test-job.default.svc\n",
2104+
},
2105+
},
2106+
},
2107+
"with cluster domain, launcher as worker disabled, with running pods": {
2108+
mpiJob: &kubeflow.MPIJob{
2109+
ObjectMeta: metav1.ObjectMeta{
2110+
Name: "test-job",
2111+
Namespace: "tenant-a",
2112+
},
2113+
Spec: kubeflow.MPIJobSpec{
2114+
MPIImplementation: kubeflow.MPIImplementationOpenMPI,
2115+
},
2116+
},
2117+
runningPods: []*corev1.Pod{
2118+
{
2119+
ObjectMeta: metav1.ObjectMeta{
2120+
Name: "test-job-worker-0",
2121+
Namespace: "tenant-a",
2122+
},
2123+
},
2124+
},
2125+
clusterDomain: "cluster.local",
2126+
wantConfigMap: &corev1.ConfigMap{
2127+
ObjectMeta: metav1.ObjectMeta{
2128+
Name: "test-job-config",
2129+
Namespace: "tenant-a",
2130+
},
2131+
Data: map[string]string{
2132+
"discover_hosts.sh": "#!/bin/sh\necho test-job-worker-0.test-job.tenant-a.svc.cluster.local\n",
2133+
},
2134+
},
2135+
},
2136+
"with cluster domain, launcher as worker enabled, with running pods": {
2137+
mpiJob: &kubeflow.MPIJob{
2138+
ObjectMeta: metav1.ObjectMeta{
2139+
Name: "test-job",
2140+
Namespace: "tenant-a",
2141+
},
2142+
Spec: kubeflow.MPIJobSpec{
2143+
MPIImplementation: kubeflow.MPIImplementationOpenMPI,
2144+
RunLauncherAsWorker: ptr.To(true),
2145+
},
2146+
},
2147+
runningPods: []*corev1.Pod{
2148+
{
2149+
ObjectMeta: metav1.ObjectMeta{
2150+
Name: "test-job-worker-0",
2151+
Namespace: "tenant-a",
2152+
},
2153+
},
2154+
},
2155+
clusterDomain: "cluster.local",
2156+
wantConfigMap: &corev1.ConfigMap{
2157+
ObjectMeta: metav1.ObjectMeta{
2158+
Name: "test-job-config",
2159+
Namespace: "tenant-a",
2160+
},
2161+
Data: map[string]string{
2162+
"discover_hosts.sh": "#!/bin/sh\necho test-job-launcher.test-job.tenant-a.svc.cluster.local\necho test-job-worker-0.test-job.tenant-a.svc.cluster.local\n",
2163+
},
2164+
},
2165+
},
2166+
"pods are sorted by name": {
2167+
mpiJob: &kubeflow.MPIJob{
2168+
ObjectMeta: metav1.ObjectMeta{
2169+
Name: "test-job",
2170+
Namespace: "default",
2171+
},
2172+
Spec: kubeflow.MPIJobSpec{
2173+
MPIImplementation: kubeflow.MPIImplementationOpenMPI,
2174+
},
2175+
},
2176+
runningPods: []*corev1.Pod{
2177+
{
2178+
ObjectMeta: metav1.ObjectMeta{
2179+
Name: "test-job-worker-2",
2180+
Namespace: "default",
2181+
},
2182+
},
2183+
{
2184+
ObjectMeta: metav1.ObjectMeta{
2185+
Name: "test-job-worker-0",
2186+
Namespace: "default",
2187+
},
2188+
},
2189+
{
2190+
ObjectMeta: metav1.ObjectMeta{
2191+
Name: "test-job-worker-1",
2192+
Namespace: "default",
2193+
},
2194+
},
2195+
},
2196+
clusterDomain: "",
2197+
wantConfigMap: &corev1.ConfigMap{
2198+
ObjectMeta: metav1.ObjectMeta{
2199+
Name: "test-job-config",
2200+
Namespace: "default",
2201+
},
2202+
Data: map[string]string{
2203+
"discover_hosts.sh": "#!/bin/sh\necho test-job-worker-0.test-job.default.svc\necho test-job-worker-1.test-job.default.svc\necho test-job-worker-2.test-job.default.svc\n",
2204+
},
2205+
},
2206+
},
2207+
}
2208+
for name, tc := range testCases {
2209+
t.Run(name, func(t *testing.T) {
2210+
configMap := &corev1.ConfigMap{
2211+
ObjectMeta: metav1.ObjectMeta{
2212+
Name: tc.mpiJob.Name + "-config",
2213+
Namespace: tc.mpiJob.Namespace,
2214+
},
2215+
Data: make(map[string]string),
2216+
}
2217+
updateDiscoverHostsInConfigMap(configMap, tc.mpiJob, tc.runningPods, tc.clusterDomain)
2218+
if diff := cmp.Diff(tc.wantConfigMap, configMap); len(diff) != 0 {
2219+
t.Errorf("Unexpected ConfigMap (-want,+got):\n%s", diff)
2220+
}
2221+
})
2222+
}
2223+
}
2224+
19842225
func joinEnvVars(evs ...interface{}) []corev1.EnvVar {
19852226
var result []corev1.EnvVar
19862227
for _, ev := range evs {

0 commit comments

Comments
 (0)