Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Commit 0e8ca01

Browse files
committed
Addressed one more round of comments
1 parent 347ed69 commit 0e8ca01

File tree

7 files changed

+49
-25
lines changed

7 files changed

+49
-25
lines changed

core/src/main/scala/org/apache/spark/SparkConf.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -668,7 +668,9 @@ private[spark] object SparkConf extends Logging {
668668
MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM.key -> Seq(
669669
AlternateConfig("spark.reducer.maxReqSizeShuffleToMem", "2.3")),
670670
LISTENER_BUS_EVENT_QUEUE_CAPACITY.key -> Seq(
671-
AlternateConfig("spark.scheduler.listenerbus.eventqueue.size", "2.3"))
671+
AlternateConfig("spark.scheduler.listenerbus.eventqueue.size", "2.3")),
672+
"spark.driver.memoryOverhead" -> Seq(
673+
AlternateConfig("spark.yarn.driver.memoryOverhead", "2.3"))
672674
)
673675

674676
/**

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2751,16 +2751,29 @@ private[spark] object Utils extends Logging {
27512751
require(rawMasterURL.startsWith("k8s://"),
27522752
"Kubernetes master URL must start with k8s://.")
27532753
val masterWithoutK8sPrefix = rawMasterURL.substring("k8s://".length)
2754-
if (masterWithoutK8sPrefix.startsWith("https://")) {
2755-
masterWithoutK8sPrefix
2756-
} else if (masterWithoutK8sPrefix.startsWith("http://")) {
2757-
logWarning("Kubernetes master URL uses HTTP instead of HTTPS.")
2758-
masterWithoutK8sPrefix
2759-
} else {
2754+
2755+
// To handle master URLs, e.g., k8s://host:port.
2756+
if (!masterWithoutK8sPrefix.contains("://")) {
27602757
val resolvedURL = s"https://$masterWithoutK8sPrefix"
27612758
logInfo("No scheme specified for kubernetes master URL, so defaulting to https. Resolved " +
27622759
s"URL is $resolvedURL.")
2763-
resolvedURL
2760+
return resolvedURL
2761+
}
2762+
2763+
val masterScheme = new URI(masterWithoutK8sPrefix).getScheme
2764+
masterScheme.toLowerCase match {
2765+
case "https" =>
2766+
masterWithoutK8sPrefix
2767+
case "http" =>
2768+
logWarning("Kubernetes master URL uses HTTP instead of HTTPS.")
2769+
masterWithoutK8sPrefix
2770+
case null =>
2771+
val resolvedURL = s"https://$masterWithoutK8sPrefix"
2772+
logInfo("No scheme specified for kubernetes master URL, so defaulting to https. Resolved " +
2773+
s"URL is $resolvedURL.")
2774+
resolvedURL
2775+
case _ =>
2776+
throw new IllegalArgumentException("Invalid Kubernetes master scheme: " + masterScheme)
27642777
}
27652778
}
27662779
}

core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -408,6 +408,7 @@ class SparkSubmitSuite
408408
childArgsMap.get("--arg") should be (Some("arg1"))
409409
mainClass should be (KUBERNETES_CLUSTER_SUBMIT_CLASS)
410410
classpath should have length (0)
411+
conf.get("spark.master") should be ("https://host:port")
411412
conf.get("spark.executor.memory") should be ("5g")
412413
conf.get("spark.driver.memory") should be ("4g")
413414
conf.get("spark.kubernetes.namespace") should be ("spark")

core/src/test/scala/org/apache/spark/util/UtilsSuite.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1153,9 +1153,19 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
11531153
val k8sMasterURLHttp = Utils.checkAndGetK8sMasterUrl("k8s://http://host:port")
11541154
assert(k8sMasterURLHttp == "http://host:port")
11551155

1156+
val k8sMasterURLWithoutScheme = Utils.checkAndGetK8sMasterUrl("k8s://127.0.0.1:8443")
1157+
assert(k8sMasterURLWithoutScheme == "https://127.0.0.1:8443")
1158+
1159+
val k8sMasterURLWithoutScheme2 = Utils.checkAndGetK8sMasterUrl("k8s://127.0.0.1")
1160+
assert(k8sMasterURLWithoutScheme2 == "https://127.0.0.1")
1161+
11561162
intercept[IllegalArgumentException] {
11571163
Utils.checkAndGetK8sMasterUrl("k8s:https://host:port")
11581164
}
1165+
1166+
intercept[IllegalArgumentException] {
1167+
Utils.checkAndGetK8sMasterUrl("k8s://foo://host:port")
1168+
}
11591169
}
11601170
}
11611171

launcher/src/main/java/org/apache/spark/launcher/SparkSubmitOptionParser.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -76,9 +76,6 @@ class SparkSubmitOptionParser {
7676
protected final String PRINCIPAL = "--principal";
7777
protected final String QUEUE = "--queue";
7878

79-
// Kubernetes-only options.
80-
protected final String KUBERNETES_NAMESPACE = "--kubernetes-namespace";
81-
8279
/**
8380
* This is the canonical list of spark-submit options. Each entry in the array contains the
8481
* different aliases for the same option; the first element of each entry is the "official"
@@ -117,8 +114,7 @@ class SparkSubmitOptionParser {
117114
{ QUEUE },
118115
{ REPOSITORIES },
119116
{ STATUS },
120-
{ TOTAL_EXECUTOR_CORES },
121-
{ KUBERNETES_NAMESPACE },
117+
{ TOTAL_EXECUTOR_CORES }
122118
};
123119

124120
/**

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/Client.scala

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import java.util.{Collections, UUID}
2020

2121
import scala.collection.JavaConverters._
2222
import scala.collection.mutable
23+
import scala.util.control.NonFatal
2324

2425
import io.fabric8.kubernetes.api.model._
2526
import io.fabric8.kubernetes.client.KubernetesClient
@@ -137,10 +138,10 @@ private[spark] class Client(
137138
.build()
138139

139140
Utils.tryWithResource(
140-
kubernetesClient
141-
.pods()
142-
.withName(resolvedDriverPod.getMetadata.getName)
143-
.watch(loggingPodStatusWatcher)) { _ =>
141+
kubernetesClient
142+
.pods()
143+
.withName(resolvedDriverPod.getMetadata.getName)
144+
.watch(loggingPodStatusWatcher)) { _ =>
144145
val createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod)
145146
try {
146147
if (currentDriverSpec.otherKubernetesResources.nonEmpty) {
@@ -149,7 +150,7 @@ private[spark] class Client(
149150
kubernetesClient.resourceList(otherKubernetesResources: _*).createOrReplace()
150151
}
151152
} catch {
152-
case e: Throwable =>
153+
case NonFatal(e) =>
153154
kubernetesClient.pods().delete(createdDriverPod)
154155
throw e
155156
}
@@ -193,6 +194,10 @@ private[spark] object Client extends SparkApplication {
193194

194195
private def run(clientArguments: ClientArguments, sparkConf: SparkConf): Unit = {
195196
val namespace = sparkConf.get(KUBERNETES_NAMESPACE)
197+
// For constructing the app ID, we can't use the Spark application name, as the app ID is going
198+
// to be added as a label to group resources belonging to the same application. Label values are
199+
// considerably restrictive, e.g. must be no longer than 63 characters in length. So we generate
200+
// a unique app ID (captured by spark.app.id) in the format below.
196201
val kubernetesAppId = s"spark-${UUID.randomUUID().toString.replaceAll("-", "")}"
197202
val launchTime = System.currentTimeMillis()
198203
val waitForAppCompletion = sparkConf.get(WAIT_FOR_APP_COMPLETION)

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestrator.scala

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -41,14 +41,11 @@ private[spark] class DriverConfigurationStepsOrchestrator(
4141
appArgs: Array[String],
4242
submissionSparkConf: SparkConf) {
4343

44-
// The resource name prefix is derived from the application name, making it easy to connect the
45-
// names of the Kubernetes resources from e.g. kubectl or the Kubernetes dashboard to the
46-
// application the user submitted. However, we can't use the application name in the label, as
47-
// label values are considerably restrictive, e.g. must be no longer than 63 characters in
48-
// length. So we generate a separate identifier for the app ID itself, and bookkeeping that
49-
// requires finding "all pods for this application" should use the kubernetesAppId.
44+
// The resource name prefix is derived from the Spark application name, making it easy to connect
45+
// the names of the Kubernetes resources from e.g. kubectl or the Kubernetes dashboard to the
46+
// application the user submitted.
5047
private val kubernetesResourceNamePrefix = {
51-
val uuid = UUID.nameUUIDFromBytes(Longs.toByteArray(launchTime))
48+
val uuid = UUID.nameUUIDFromBytes(Longs.toByteArray(launchTime)).toString.replaceAll("-", "")
5249
s"$appName-$uuid".toLowerCase.replaceAll("\\.", "-")
5350
}
5451

0 commit comments

Comments
 (0)