This repository was archived by the owner on Jan 9, 2020. It is now read-only.
File tree Expand file tree Collapse file tree 6 files changed +41
-18
lines changed
main/scala/org/apache/spark
test/scala/org/apache/spark
launcher/src/main/java/org/apache/spark/launcher
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit Expand file tree Collapse file tree 6 files changed +41
-18
lines changed Original file line number Diff line number Diff line change @@ -668,7 +668,9 @@ private[spark] object SparkConf extends Logging {
668
668
MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM .key -> Seq (
669
669
AlternateConfig (" spark.reducer.maxReqSizeShuffleToMem" , " 2.3" )),
670
670
LISTENER_BUS_EVENT_QUEUE_CAPACITY .key -> Seq (
671
- AlternateConfig (" spark.scheduler.listenerbus.eventqueue.size" , " 2.3" ))
671
+ AlternateConfig (" spark.scheduler.listenerbus.eventqueue.size" , " 2.3" )),
672
+ " spark.driver.memoryOverhead" -> Seq (
673
+ AlternateConfig (" spark.yarn.driver.memoryOverhead" , " 2.3" ))
672
674
)
673
675
674
676
/**
Original file line number Diff line number Diff line change @@ -2751,16 +2751,29 @@ private[spark] object Utils extends Logging {
2751
2751
require(rawMasterURL.startsWith(" k8s://" ),
2752
2752
" Kubernetes master URL must start with k8s://." )
2753
2753
val masterWithoutK8sPrefix = rawMasterURL.substring(" k8s://" .length)
2754
- if (masterWithoutK8sPrefix.startsWith(" https://" )) {
2755
- masterWithoutK8sPrefix
2756
- } else if (masterWithoutK8sPrefix.startsWith(" http://" )) {
2757
- logWarning(" Kubernetes master URL uses HTTP instead of HTTPS." )
2758
- masterWithoutK8sPrefix
2759
- } else {
2754
+
2755
+ // To handle master URLs, e.g., k8s://host:port.
2756
+ if (! masterWithoutK8sPrefix.contains(" ://" )) {
2760
2757
val resolvedURL = s " https:// $masterWithoutK8sPrefix"
2761
2758
logInfo(" No scheme specified for kubernetes master URL, so defaulting to https. Resolved " +
2762
2759
s " URL is $resolvedURL. " )
2763
- resolvedURL
2760
+ return resolvedURL
2761
+ }
2762
+
2763
+ val masterScheme = new URI (masterWithoutK8sPrefix).getScheme
2764
+ masterScheme.toLowerCase match {
2765
+ case " https" =>
2766
+ masterWithoutK8sPrefix
2767
+ case " http" =>
2768
+ logWarning(" Kubernetes master URL uses HTTP instead of HTTPS." )
2769
+ masterWithoutK8sPrefix
2770
+ case null =>
2771
+ val resolvedURL = s " https:// $masterWithoutK8sPrefix"
2772
+ logInfo(" No scheme specified for kubernetes master URL, so defaulting to https. Resolved " +
2773
+ s " URL is $resolvedURL. " )
2774
+ resolvedURL
2775
+ case _ =>
2776
+ throw new IllegalArgumentException (" Invalid Kubernetes master scheme: " + masterScheme)
2764
2777
}
2765
2778
}
2766
2779
}
Original file line number Diff line number Diff line change @@ -408,6 +408,7 @@ class SparkSubmitSuite
408
408
childArgsMap.get(" --arg" ) should be (Some (" arg1" ))
409
409
mainClass should be (KUBERNETES_CLUSTER_SUBMIT_CLASS )
410
410
classpath should have length (0 )
411
+ conf.get(" spark.master" ) should be (" https://host:port" )
411
412
conf.get(" spark.executor.memory" ) should be (" 5g" )
412
413
conf.get(" spark.driver.memory" ) should be (" 4g" )
413
414
conf.get(" spark.kubernetes.namespace" ) should be (" spark" )
Original file line number Diff line number Diff line change @@ -1153,9 +1153,19 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
1153
1153
val k8sMasterURLHttp = Utils .checkAndGetK8sMasterUrl(" k8s://http://host:port" )
1154
1154
assert(k8sMasterURLHttp == " http://host:port" )
1155
1155
1156
+ val k8sMasterURLWithoutScheme = Utils .checkAndGetK8sMasterUrl(" k8s://127.0.0.1:8443" )
1157
+ assert(k8sMasterURLWithoutScheme == " https://127.0.0.1:8443" )
1158
+
1159
+ val k8sMasterURLWithoutScheme2 = Utils .checkAndGetK8sMasterUrl(" k8s://127.0.0.1" )
1160
+ assert(k8sMasterURLWithoutScheme2 == " https://127.0.0.1" )
1161
+
1156
1162
intercept[IllegalArgumentException ] {
1157
1163
Utils .checkAndGetK8sMasterUrl(" k8s:https://host:port" )
1158
1164
}
1165
+
1166
+ intercept[IllegalArgumentException ] {
1167
+ Utils .checkAndGetK8sMasterUrl(" k8s://foo://host:port" )
1168
+ }
1159
1169
}
1160
1170
}
1161
1171
Original file line number Diff line number Diff line change @@ -76,9 +76,6 @@ class SparkSubmitOptionParser {
76
76
protected final String PRINCIPAL = "--principal" ;
77
77
protected final String QUEUE = "--queue" ;
78
78
79
- // Kubernetes-only options.
80
- protected final String KUBERNETES_NAMESPACE = "--kubernetes-namespace" ;
81
-
82
79
/**
83
80
* This is the canonical list of spark-submit options. Each entry in the array contains the
84
81
* different aliases for the same option; the first element of each entry is the "official"
@@ -117,8 +114,7 @@ class SparkSubmitOptionParser {
117
114
{ QUEUE },
118
115
{ REPOSITORIES },
119
116
{ STATUS },
120
- { TOTAL_EXECUTOR_CORES },
121
- { KUBERNETES_NAMESPACE },
117
+ { TOTAL_EXECUTOR_CORES }
122
118
};
123
119
124
120
/**
Original file line number Diff line number Diff line change @@ -20,6 +20,7 @@ import java.util.{Collections, UUID}
20
20
21
21
import scala .collection .JavaConverters ._
22
22
import scala .collection .mutable
23
+ import scala .util .control .NonFatal
23
24
24
25
import io .fabric8 .kubernetes .api .model ._
25
26
import io .fabric8 .kubernetes .client .KubernetesClient
@@ -137,10 +138,10 @@ private[spark] class Client(
137
138
.build()
138
139
139
140
Utils .tryWithResource(
140
- kubernetesClient
141
- .pods()
142
- .withName(resolvedDriverPod.getMetadata.getName)
143
- .watch(loggingPodStatusWatcher)) { _ =>
141
+ kubernetesClient
142
+ .pods()
143
+ .withName(resolvedDriverPod.getMetadata.getName)
144
+ .watch(loggingPodStatusWatcher)) { _ =>
144
145
val createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod)
145
146
try {
146
147
if (currentDriverSpec.otherKubernetesResources.nonEmpty) {
@@ -149,7 +150,7 @@ private[spark] class Client(
149
150
kubernetesClient.resourceList(otherKubernetesResources : _* ).createOrReplace()
150
151
}
151
152
} catch {
152
- case e : Throwable =>
153
+ case NonFatal (e) =>
153
154
kubernetesClient.pods().delete(createdDriverPod)
154
155
throw e
155
156
}
You can’t perform that action at this time.
0 commit comments