Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
999ec13
[SPARK-22570][SQL] Avoid to create a lot of global variables by using…
kiszk Nov 30, 2017
6ac57fd
[SPARK-21417][SQL] Infer join conditions using propagated constraints
Nov 30, 2017
bcceab6
[SPARK-22489][SQL] Shouldn't change broadcast join buildSide if user …
wangyum Nov 30, 2017
f5f8e84
[SPARK-22614] Dataset API: repartitionByRange(...)
adrian-ionescu Nov 30, 2017
7e5f669
[SPARK-22428][DOC] Add spark application garbage collector configurat…
gaborgsomogyi Dec 1, 2017
7da1f57
[SPARK-22373] Bump Janino dependency version to fix thread safety issue…
Victsm Dec 1, 2017
dc36542
[SPARK-22653] executorAddress registered in CoarseGrainedSchedulerBac…
tgravescs Dec 1, 2017
16adaf6
[SPARK-22601][SQL] Data load is getting displayed successful on provi…
sujith71955 Dec 1, 2017
9d06a9e
[SPARK-22393][SPARK-SHELL] spark-shell can't find imported types in c…
mpetruska Dec 1, 2017
ee10ca7
[SPARK-22638][SS] Use a separate queue for StreamingQueryListenerBus
zsxwing Dec 1, 2017
aa4cf2b
[SPARK-22651][PYTHON][ML] Prevent initiating multiple Hive clients fo…
HyukjinKwon Dec 2, 2017
d2cf95a
[SPARK-22634][BUILD] Update Bouncy Castle to 1.58
srowen Dec 2, 2017
f23dddf
[SPARK-20682][SPARK-15474][SPARK-21791] Add new ORCFileFormat based o…
dongjoon-hyun Dec 3, 2017
2c16267
[SPARK-22669][SQL] Avoid unnecessary function calls in code generation
mgaido91 Dec 3, 2017
dff440f
[SPARK-22626][SQL] deals with wrong Hive's statistics (zero rowCount)
wangyum Dec 3, 2017
4131ad0
[SPARK-22489][DOC][FOLLOWUP] Update broadcast behavior changes in mig…
wangyum Dec 4, 2017
3927bb9
[SPARK-22473][FOLLOWUP][TEST] Remove deprecated Date functions
mgaido91 Dec 4, 2017
f81401e
[SPARK-22162] Executors and the driver should use consistent JobIDs i…
Dec 4, 2017
e1dd03e
[SPARK-22372][CORE, YARN] Make cluster submission use SparkApplication.
Dec 4, 2017
dcaac45
Spark on Kubernetes - basic submission client
liyinan926 Nov 10, 2017
27c67ff
Addressed first round of review comments
liyinan926 Nov 27, 2017
6d597d0
Made Client implement the SparkApplication trait
liyinan926 Nov 28, 2017
5b9fa39
Addressed the second round of comments
liyinan926 Nov 28, 2017
5ccadb5
Added missing step for supporting local:// dependencies and addressed…
liyinan926 Nov 30, 2017
12f2797
Fixed Scala style check errors
liyinan926 Nov 30, 2017
c35fe48
Addressed another round of comments
liyinan926 Dec 4, 2017
faa2849
Rebased on master and added a constant val for the Client class
liyinan926 Dec 4, 2017
347ed69
Addressed another major round of comments
liyinan926 Dec 5, 2017
0e8ca01
Addressed one more round of comments
liyinan926 Dec 5, 2017
3a0b8e3
Removed mentioning of kubernetes-namespace
liyinan926 Dec 6, 2017
83d0b9c
Fixed a couple of bugs found during manual tests
liyinan926 Dec 7, 2017
44c40b1
Guard against client mode in SparkContext
liyinan926 Dec 8, 2017
67bc847
Added libc6-compat into the base docker image
liyinan926 Dec 8, 2017
7d2b303
Addressed latest comments
liyinan926 Dec 8, 2017
caf2206
Addressed docs comments
liyinan926 Dec 9, 2017
2e7810b
Fixed a comment
liyinan926 Dec 11, 2017
cbcd30e
Addressed latest comments
liyinan926 Dec 11, 2017
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,16 @@
</dependency>
</dependencies>
</profile>
<profile>
<id>kubernetes</id>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-kubernetes_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</profile>
<profile>
<id>hive</id>
<dependencies>
Expand Down
102 changes: 51 additions & 51 deletions core/src/main/scala/org/apache/spark/SecurityManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,19 @@ package org.apache.spark

import java.lang.{Byte => JByte}
import java.net.{Authenticator, PasswordAuthentication}
import java.nio.charset.StandardCharsets.UTF_8
import java.security.{KeyStore, SecureRandom}
import java.security.cert.X509Certificate
import javax.net.ssl._

import com.google.common.hash.HashCodes
import com.google.common.io.Files
import org.apache.hadoop.io.Text
import org.apache.hadoop.security.{Credentials, UserGroupInformation}

import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.launcher.SparkLauncher
import org.apache.spark.network.sasl.SecretKeyHolder
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -225,7 +227,6 @@ private[spark] class SecurityManager(
setViewAclsGroups(sparkConf.get("spark.ui.view.acls.groups", ""));
setModifyAclsGroups(sparkConf.get("spark.modify.acls.groups", ""));

private val secretKey = generateSecretKey()
logInfo("SecurityManager: authentication " + (if (authOn) "enabled" else "disabled") +
"; ui acls " + (if (aclsOn) "enabled" else "disabled") +
"; users with view permissions: " + viewAcls.toString() +
Expand Down Expand Up @@ -416,50 +417,6 @@ private[spark] class SecurityManager(

def getIOEncryptionKey(): Option[Array[Byte]] = ioEncryptionKey

/**
* Generates or looks up the secret key.
*
* The way the key is stored depends on the Spark deployment mode. Yarn
* uses the Hadoop UGI.
*
* For non-Yarn deployments, If the config variable is not set
* we throw an exception.
*/
private def generateSecretKey(): String = {
if (!isAuthenticationEnabled) {
null
} else if (SparkHadoopUtil.get.isYarnMode) {
// In YARN mode, the secure cookie will be created by the driver and stashed in the
// user's credentials, where executors can get it. The check for an array of size 0
// is because of the test code in YarnSparkHadoopUtilSuite.
val secretKey = SparkHadoopUtil.get.getSecretKeyFromUserCredentials(SECRET_LOOKUP_KEY)
if (secretKey == null || secretKey.length == 0) {
logDebug("generateSecretKey: yarn mode, secret key from credentials is null")
val rnd = new SecureRandom()
val length = sparkConf.getInt("spark.authenticate.secretBitLength", 256) / JByte.SIZE
val secret = new Array[Byte](length)
rnd.nextBytes(secret)

val cookie = HashCodes.fromBytes(secret).toString()
SparkHadoopUtil.get.addSecretKeyToUserCredentials(SECRET_LOOKUP_KEY, cookie)
cookie
} else {
new Text(secretKey).toString
}
} else {
// user must have set spark.authenticate.secret config
// For Master/Worker, auth secret is in conf; for Executors, it is in env variable
Option(sparkConf.getenv(SecurityManager.ENV_AUTH_SECRET))
.orElse(sparkConf.getOption(SecurityManager.SPARK_AUTH_SECRET_CONF)) match {
case Some(value) => value
case None =>
throw new IllegalArgumentException(
"Error: a secret key must be specified via the " +
SecurityManager.SPARK_AUTH_SECRET_CONF + " config")
}
}
}

/**
* Check to see if Acls for the UI are enabled
* @return true if UI authentication is enabled, otherwise false
Expand Down Expand Up @@ -542,7 +499,51 @@ private[spark] class SecurityManager(
* Gets the secret key.
* @return the secret key as a String if authentication is enabled, otherwise returns null
*/
def getSecretKey(): String = secretKey
def getSecretKey(): String = {
if (isAuthenticationEnabled) {
val creds = UserGroupInformation.getCurrentUser().getCredentials()
Option(creds.getSecretKey(SECRET_LOOKUP_KEY))
.map { bytes => new String(bytes, UTF_8) }
.orElse(Option(sparkConf.getenv(ENV_AUTH_SECRET)))
.orElse(sparkConf.getOption(SPARK_AUTH_SECRET_CONF))
.getOrElse {
throw new IllegalArgumentException(
s"A secret key must be specified via the $SPARK_AUTH_SECRET_CONF config")
}
} else {
null
}
}

/**
* Initialize the authentication secret.
*
* If authentication is disabled, do nothing.
*
* In YARN mode, generate a new secret and store it in the current user's credentials.
*
* In other modes, assert that the auth secret is set in the configuration.
*/
def initializeAuth(): Unit = {
if (!sparkConf.get(NETWORK_AUTH_ENABLED)) {
return
}

if (sparkConf.get(SparkLauncher.SPARK_MASTER, null) != "yarn") {
require(sparkConf.contains(SPARK_AUTH_SECRET_CONF),
s"A secret key must be specified via the $SPARK_AUTH_SECRET_CONF config.")
return
}

val rnd = new SecureRandom()
val length = sparkConf.getInt("spark.authenticate.secretBitLength", 256) / JByte.SIZE
val secretBytes = new Array[Byte](length)
rnd.nextBytes(secretBytes)

val creds = new Credentials()
creds.addSecretKey(SECRET_LOOKUP_KEY, secretBytes)
UserGroupInformation.getCurrentUser().addCredentials(creds)
}

// Default SecurityManager only has a single secret key, so ignore appId.
override def getSaslUser(appId: String): String = getSaslUser()
Expand All @@ -551,13 +552,12 @@ private[spark] class SecurityManager(

private[spark] object SecurityManager {

val SPARK_AUTH_CONF: String = "spark.authenticate"
val SPARK_AUTH_SECRET_CONF: String = "spark.authenticate.secret"
val SPARK_AUTH_CONF = NETWORK_AUTH_ENABLED.key
val SPARK_AUTH_SECRET_CONF = "spark.authenticate.secret"
// This is used to set auth secret to an executor's env variable. It should have the same
// value as SPARK_AUTH_SECRET_CONF set in SparkConf
val ENV_AUTH_SECRET = "_SPARK_AUTH_SECRET"

// key used to store the spark secret in the Hadoop UGI
val SECRET_LOOKUP_KEY = "sparkCookie"

val SECRET_LOOKUP_KEY = new Text("sparkCookie")
}
6 changes: 5 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -668,7 +668,11 @@ private[spark] object SparkConf extends Logging {
MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM.key -> Seq(
AlternateConfig("spark.reducer.maxReqSizeShuffleToMem", "2.3")),
LISTENER_BUS_EVENT_QUEUE_CAPACITY.key -> Seq(
AlternateConfig("spark.scheduler.listenerbus.eventqueue.size", "2.3"))
AlternateConfig("spark.scheduler.listenerbus.eventqueue.size", "2.3")),
DRIVER_MEMORY_OVERHEAD.key -> Seq(
AlternateConfig("spark.yarn.driver.memoryOverhead", "2.3")),
EXECUTOR_MEMORY_OVERHEAD.key -> Seq(
AlternateConfig("spark.yarn.executor.memoryOverhead", "2.3"))
)

/**
Expand Down
4 changes: 0 additions & 4 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.spark

import java.io._
import java.lang.reflect.Constructor
import java.net.URI
import java.util.{Arrays, Locale, Properties, ServiceLoader, UUID}
import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap}
Expand Down Expand Up @@ -413,8 +412,6 @@ class SparkContext(config: SparkConf) extends Logging {
}
}

if (master == "yarn" && deployMode == "client") System.setProperty("SPARK_YARN_MODE", "true")

_listenerBus = new LiveListenerBus(_conf)

// Initialize the app status store and listener before SparkEnv is created so that it gets
Expand Down Expand Up @@ -1955,7 +1952,6 @@ class SparkContext(config: SparkConf) extends Logging {
// `SparkContext` is stopped.
localProperties.remove()
// Unset YARN mode system env variable, to allow switching between cluster types.
System.clearProperty("SPARK_YARN_MODE")
SparkContext.clearActiveContext()
logInfo("Successfully stopped SparkContext")
}
Expand Down
4 changes: 4 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,10 @@ object SparkEnv extends Logging {
}

val securityManager = new SecurityManager(conf, ioEncryptionKey)
if (isDriver) {
securityManager.initializeAuth()
}

ioEncryptionKey.foreach { _ =>
if (!securityManager.isEncryptionEnabled()) {
logWarning("I/O encryption enabled without RPC encryption: keys will be visible on the " +
Expand Down
8 changes: 7 additions & 1 deletion core/src/main/scala/org/apache/spark/deploy/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -217,8 +217,13 @@ object Client {
println("Use ./bin/spark-submit with \"--master spark://host:port\"")
}
// scalastyle:on println
new ClientApp().start(args, new SparkConf())
}
}

val conf = new SparkConf()
private[spark] class ClientApp extends SparkApplication {

override def start(args: Array[String], conf: SparkConf): Unit = {
val driverArgs = new ClientArguments(args)

if (!conf.contains("spark.rpc.askTimeout")) {
Expand All @@ -235,4 +240,5 @@ object Client {

rpcEnv.awaitTermination()
}

}
48 changes: 6 additions & 42 deletions core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,7 @@ class SparkHadoopUtil extends Logging {
}

def transferCredentials(source: UserGroupInformation, dest: UserGroupInformation) {
for (token <- source.getTokens.asScala) {
dest.addToken(token)
}
dest.addCredentials(source.getCredentials())
}

/**
Expand Down Expand Up @@ -120,16 +118,9 @@ class SparkHadoopUtil extends Logging {
* Add any user credentials to the job conf which are necessary for running on a secure Hadoop
* cluster.
*/
def addCredentials(conf: JobConf) {}

def isYarnMode(): Boolean = { false }

def addSecretKeyToUserCredentials(key: String, secret: String) {}

def getSecretKeyFromUserCredentials(key: String): Array[Byte] = { null }

def getCurrentUserCredentials(): Credentials = {
UserGroupInformation.getCurrentUser().getCredentials()
def addCredentials(conf: JobConf): Unit = {
val jobCreds = conf.getCredentials()
jobCreds.mergeAll(UserGroupInformation.getCurrentUser().getCredentials())
}

def addCurrentUserCredentials(creds: Credentials): Unit = {
Expand Down Expand Up @@ -328,17 +319,6 @@ class SparkHadoopUtil extends Logging {
}
}

/**
* Start a thread to periodically update the current user's credentials with new credentials so
* that access to secured service does not fail.
*/
private[spark] def startCredentialUpdater(conf: SparkConf) {}

/**
* Stop the thread that does the credential updates.
*/
private[spark] def stopCredentialUpdater() {}

/**
* Return a fresh Hadoop configuration, bypassing the HDFS cache mechanism.
* This is to prevent the DFSClient from using an old cached token to connect to the NameNode.
Expand Down Expand Up @@ -441,14 +421,7 @@ class SparkHadoopUtil extends Logging {

object SparkHadoopUtil {

private lazy val hadoop = new SparkHadoopUtil
private lazy val yarn = try {
Utils.classForName("org.apache.spark.deploy.yarn.YarnSparkHadoopUtil")
.newInstance()
.asInstanceOf[SparkHadoopUtil]
} catch {
case e: Exception => throw new SparkException("Unable to load YARN support", e)
}
private lazy val instance = new SparkHadoopUtil

val SPARK_YARN_CREDS_TEMP_EXTENSION = ".tmp"

Expand All @@ -462,16 +435,7 @@ object SparkHadoopUtil {
*/
private[spark] val UPDATE_INPUT_METRICS_INTERVAL_RECORDS = 1000

def get: SparkHadoopUtil = {
// Check each time to support changing to/from YARN
val yarnMode = java.lang.Boolean.parseBoolean(
System.getProperty("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE")))
if (yarnMode) {
yarn
} else {
hadoop
}
}
def get: SparkHadoopUtil = instance

/**
* Given an expiration date (e.g. for Hadoop Delegation Tokens) return a the date
Expand Down
Loading