Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Commit 347ed69

Browse files
committed
Addressed another major round of comments
1 parent faa2849 commit 347ed69

File tree

16 files changed

+105
-117
lines changed

16 files changed

+105
-117
lines changed

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -576,10 +576,6 @@ object SparkSubmit extends CommandLineUtils with Logging {
576576
OptionAssigner(args.principal, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.principal"),
577577
OptionAssigner(args.keytab, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.keytab"),
578578

579-
// Kubernetes only
580-
OptionAssigner(args.kubernetesNamespace, KUBERNETES, ALL_DEPLOY_MODES,
581-
confKey = "spark.kubernetes.namespace"),
582-
583579
// Other options
584580
OptionAssigner(args.executorCores, STANDALONE | YARN | KUBERNETES, ALL_DEPLOY_MODES,
585581
confKey = "spark.executor.cores"),

core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -81,9 +81,6 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
8181
var submissionToRequestStatusFor: String = null
8282
var useRest: Boolean = true // used internally
8383

84-
// Kubernetes only
85-
var kubernetesNamespace: String = null
86-
8784
/** Default properties present in the currently defined defaults file. */
8885
lazy val defaultSparkProperties: HashMap[String, String] = {
8986
val defaultProperties = new HashMap[String, String]()
@@ -202,10 +199,6 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
202199
keytab = Option(keytab).orElse(sparkProperties.get("spark.yarn.keytab")).orNull
203200
principal = Option(principal).orElse(sparkProperties.get("spark.yarn.principal")).orNull
204201

205-
kubernetesNamespace = Option(kubernetesNamespace)
206-
.orElse(sparkProperties.get("spark.kubernetes.namespace"))
207-
.orNull
208-
209202
// Try to set main class from JAR if no --class argument is given
210203
if (mainClass == null && !isPython && !isR && primaryResource != null) {
211204
val uri = new URI(primaryResource)
@@ -461,9 +454,6 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
461454
case KEYTAB =>
462455
keytab = value
463456

464-
case KUBERNETES_NAMESPACE =>
465-
kubernetesNamespace = value
466-
467457
case HELP =>
468458
printUsageAndExit(0)
469459

core/src/main/scala/org/apache/spark/internal/config/package.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,10 @@ package object config {
4141
.bytesConf(ByteUnit.MiB)
4242
.createWithDefaultString("1g")
4343

44+
private[spark] val DRIVER_MEMORY_OVERHEAD = ConfigBuilder("spark.driver.memoryOverhead")
45+
.bytesConf(ByteUnit.MiB)
46+
.createOptional
47+
4448
private[spark] val EVENT_LOG_COMPRESS =
4549
ConfigBuilder("spark.eventLog.compress")
4650
.booleanConf

core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -394,8 +394,8 @@ class SparkSubmitSuite
394394
"--master", "k8s://host:port",
395395
"--executor-memory", "5g",
396396
"--class", "org.SomeClass",
397-
"--kubernetes-namespace", "foo",
398397
"--driver-memory", "4g",
398+
"--conf", "spark.kubernetes.namespace=spark",
399399
"--conf", "spark.kubernetes.driver.docker.image=bar",
400400
"/home/thejar.jar",
401401
"arg1")
@@ -410,7 +410,7 @@ class SparkSubmitSuite
410410
classpath should have length (0)
411411
conf.get("spark.executor.memory") should be ("5g")
412412
conf.get("spark.driver.memory") should be ("4g")
413-
conf.get("spark.kubernetes.namespace") should be ("foo")
413+
conf.get("spark.kubernetes.namespace") should be ("spark")
414414
conf.get("spark.kubernetes.driver.docker.image") should be ("bar")
415415
}
416416

docs/configuration.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,15 @@ of the most common options to set are:
157157
or in your default properties file.
158158
</td>
159159
</tr>
160+
<tr>
161+
<td><code>spark.driver.memoryOverhead</code></td>
162+
<td>driverMemory * 0.10, with minimum of 384 </td>
163+
<td>
164+
The amount of off-heap memory (in megabytes) to be allocated per driver in cluster mode. This is
165+
memory that accounts for things like VM overheads, interned strings, other native overheads, etc.
166+
This tends to grow with the container size (typically 6-10%).
167+
</td>
168+
</tr>
160169
<tr>
161170
<td><code>spark.executor.memory</code></td>
162171
<td>1g</td>

docs/running-on-yarn.md

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -234,18 +234,11 @@ To use a custom metrics.properties for the application master and executors, upd
234234
The amount of off-heap memory (in megabytes) to be allocated per executor. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. This tends to grow with the executor size (typically 6-10%).
235235
</td>
236236
</tr>
237-
<tr>
238-
<td><code>spark.yarn.driver.memoryOverhead</code></td>
239-
<td>driverMemory * 0.10, with minimum of 384 </td>
240-
<td>
241-
The amount of off-heap memory (in megabytes) to be allocated per driver in cluster mode. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. This tends to grow with the container size (typically 6-10%).
242-
</td>
243-
</tr>
244237
<tr>
245238
<td><code>spark.yarn.am.memoryOverhead</code></td>
246239
<td>AM memory * 0.10, with minimum of 384 </td>
247240
<td>
248-
Same as <code>spark.yarn.driver.memoryOverhead</code>, but for the YARN Application Master in client mode.
241+
Same as <code>spark.driver.memoryOverhead</code>, but for the YARN Application Master in client mode.
249242
</td>
250243
</tr>
251244
<tr>

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -79,19 +79,10 @@ private[spark] object Config extends Logging {
7979

8080
val KUBERNETES_EXECUTOR_LIMIT_CORES =
8181
ConfigBuilder("spark.kubernetes.executor.limit.cores")
82-
.doc("Specify the hard cpu limit for a single executor pod")
82+
.doc("Specify the hard cpu limit for each executor pod")
8383
.stringConf
8484
.createOptional
8585

86-
val KUBERNETES_DRIVER_MEMORY_OVERHEAD =
87-
ConfigBuilder("spark.kubernetes.driver.memoryOverhead")
88-
.doc("The amount of off-heap memory (in megabytes) to be allocated for the driver and the " +
89-
"driver submission server. This is memory that accounts for things like VM overheads, " +
90-
"interned strings, other native overheads, etc. This tends to grow with the driver's " +
91-
"memory size (typically 6-10%).")
92-
.bytesConf(ByteUnit.MiB)
93-
.createOptional
94-
9586
// Note that while we set a default for this when we start up the
9687
// scheduler, the specific default value is dynamically determined
9788
// based on the executor memory.
@@ -150,6 +141,7 @@ private[spark] object Config extends Logging {
150141
ConfigBuilder("spark.kubernetes.report.interval")
151142
.doc("Interval between reports of the current app status in cluster mode.")
152143
.timeConf(TimeUnit.MILLISECONDS)
144+
.checkValue(interval => interval > 0, s"Logging interval must be a positive time value.")
153145
.createWithDefaultString("1s")
154146

155147
private[spark] val JARS_DOWNLOAD_LOCATION =

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/Client.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ private[spark] object Client extends SparkApplication {
199199
val appName = sparkConf.getOption("spark.app.name").getOrElse("spark")
200200
// The master URL has been checked for validity already in SparkSubmit.
201201
val master = sparkConf.get("spark.master")
202-
val loggingInterval = Option(sparkConf.get(REPORT_INTERVAL)).filter(_ => waitForAppCompletion)
202+
val loggingInterval = if (waitForAppCompletion) Some(sparkConf.get(REPORT_INTERVAL)) else None
203203

204204
val loggingPodStatusWatcher = new LoggingPodStatusWatcherImpl(
205205
kubernetesAppId, loggingInterval)

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestrator.scala

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,10 @@
1616
*/
1717
package org.apache.spark.deploy.k8s.submit
1818

19+
import java.util.UUID
20+
21+
import com.google.common.primitives.Longs
22+
1923
import org.apache.spark.SparkConf
2024
import org.apache.spark.deploy.k8s.Config._
2125
import org.apache.spark.deploy.k8s.ConfigurationUtils
@@ -43,8 +47,11 @@ private[spark] class DriverConfigurationStepsOrchestrator(
4347
// label values are considerably restrictive, e.g. must be no longer than 63 characters in
4448
// length. So we generate a separate identifier for the app ID itself, and bookkeeping that
4549
// requires finding "all pods for this application" should use the kubernetesAppId.
46-
private val kubernetesResourceNamePrefix =
47-
s"$appName-$launchTime".toLowerCase.replaceAll("\\.", "-")
50+
private val kubernetesResourceNamePrefix = {
51+
val uuid = UUID.nameUUIDFromBytes(Longs.toByteArray(launchTime))
52+
s"$appName-$uuid".toLowerCase.replaceAll("\\.", "-")
53+
}
54+
4855
private val dockerImagePullPolicy = submissionSparkConf.get(DOCKER_IMAGE_PULL_POLICY)
4956
private val jarsDownloadPath = submissionSparkConf.get(JARS_DOWNLOAD_LOCATION)
5057
private val filesDownloadPath = submissionSparkConf.get(FILES_DOWNLOAD_LOCATION)
@@ -91,7 +98,7 @@ private[spark] class DriverConfigurationStepsOrchestrator(
9198
}
9299
mayBeResource
93100
} else {
94-
Option.empty
101+
None
95102
}
96103

97104
val sparkJars = submissionSparkConf.getOption("spark.jars")
@@ -109,7 +116,7 @@ private[spark] class DriverConfigurationStepsOrchestrator(
109116
jarsDownloadPath,
110117
filesDownloadPath))
111118
} else {
112-
Option.empty
119+
None
113120
}
114121

115122
Seq(

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesFileUtils.scala

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -28,20 +28,11 @@ private[spark] object KubernetesFileUtils {
2828
* - File URIs with scheme local:// resolve to just the path of the URI.
2929
* - Otherwise, the URIs are returned as-is.
3030
*/
31-
def resolveSubmittedUris(
31+
def resolveFileUris(
3232
fileUris: Iterable[String],
3333
fileDownloadPath: String): Iterable[String] = {
3434
fileUris.map { uri =>
35-
val fileUri = Utils.resolveURI(uri)
36-
val fileScheme = Option(fileUri.getScheme).getOrElse("file")
37-
fileScheme match {
38-
case "file" =>
39-
val fileName = new File(fileUri.getPath).getName
40-
s"$fileDownloadPath/$fileName"
41-
case "local" =>
42-
fileUri.getPath
43-
case _ => uri
44-
}
35+
resolveFileUri(uri, fileDownloadPath, false)
4536
}
4637
}
4738

@@ -52,17 +43,26 @@ private[spark] object KubernetesFileUtils {
5243
*/
5344
def resolveFilePaths(fileUris: Iterable[String], fileDownloadPath: String): Iterable[String] = {
5445
fileUris.map { uri =>
55-
resolveFilePath(uri, fileDownloadPath)
46+
resolveFileUri(uri, fileDownloadPath, true)
5647
}
5748
}
5849

59-
private def resolveFilePath(uri: String, fileDownloadPath: String): String = {
50+
private def resolveFileUri(
51+
uri: String,
52+
fileDownloadPath: String,
53+
assumesDownloaded: Boolean): String = {
6054
val fileUri = Utils.resolveURI(uri)
61-
if (Option(fileUri.getScheme).getOrElse("file") == "local") {
62-
fileUri.getPath
63-
} else {
64-
val fileName = new File(fileUri.getPath).getName
65-
s"$fileDownloadPath/$fileName"
55+
val fileScheme = Option(fileUri.getScheme).getOrElse("file")
56+
fileScheme match {
57+
case "local" =>
58+
fileUri.getPath
59+
case _ =>
60+
if (assumesDownloaded || fileScheme == "file") {
61+
val fileName = new File(fileUri.getPath).getName
62+
s"$fileDownloadPath/$fileName"
63+
} else {
64+
uri
65+
}
6666
}
6767
}
6868
}

0 commit comments

Comments
 (0)