diff --git a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala index b8da39c8f28b7..45fe0b02c6259 100644 --- a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala +++ b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala @@ -70,8 +70,8 @@ private[spark] class HadoopDelegationTokenManager( "spark.yarn.security.credentials.%s.enabled") private val providerEnabledConfig = "spark.security.credentials.%s.enabled" - private val principal = sparkConf.get(PRINCIPAL).orNull - private val keytab = sparkConf.get(KEYTAB).orNull + protected val principal = sparkConf.get(PRINCIPAL).orNull + protected val keytab = sparkConf.get(KEYTAB).orNull require((principal == null) == (keytab == null), "Both principal and keytab must be defined, or neither.") @@ -81,8 +81,8 @@ private[spark] class HadoopDelegationTokenManager( logDebug("Using the following builtin delegation token providers: " + s"${delegationTokenProviders.keys.mkString(", ")}.") - private var renewalExecutor: ScheduledExecutorService = _ - private val driverRef = new AtomicReference[RpcEndpointRef]() + protected var renewalExecutor: ScheduledExecutorService = _ + protected val driverRef = new AtomicReference[RpcEndpointRef]() protected def setDriverRef(ref: RpcEndpointRef): Unit = { driverRef.set(ref) @@ -262,7 +262,7 @@ private[spark] class HadoopDelegationTokenManager( * * @return Credentials containing the new tokens. */ - private def obtainTokensAndScheduleRenewal(ugi: UserGroupInformation): Credentials = { + protected def obtainTokensAndScheduleRenewal(ugi: UserGroupInformation): Credentials = { ugi.doAs(new PrivilegedExceptionAction[Credentials]() { override def run(): Credentials = { val creds = new Credentials() @@ -279,7 +279,7 @@ private[spark] class HadoopDelegationTokenManager( }) } - private def doLogin(): UserGroupInformation = { + protected def doLogin(): UserGroupInformation = { logInfo(s"Attempting to login to KDC using principal: $principal") val ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab) logInfo("Successfully logged into KDC.") diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 02a5fedd6c5b4..8f796eb846f55 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -708,7 +708,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp * fully constructed), only if security is enabled in the Hadoop configuration. */ protected def createTokenManager(): Option[HadoopDelegationTokenManager] = None - } private[spark] object CoarseGrainedSchedulerBackend { diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index c2ad80c4755a6..7f849a5106dd0 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -262,6 +262,14 @@ private[spark] object Config extends Logging { .stringConf .createOptional + val KUBERNETES_KERBEROS_DT_SECRET_RENEWAL = + ConfigBuilder("spark.kubernetes.kerberos.tokenSecret.renewal") + .doc("Enabling the driver to watch the secret specified at " + + "spark.kubernetes.kerberos.tokenSecret.name for updates so that the " + + "tokens can be propogated to the executors.") + .booleanConf + .createWithDefault(false) + val APP_RESOURCE_TYPE = ConfigBuilder("spark.kubernetes.resource.type") .doc("This sets the resource type internally") diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala index 172a9054bb4f2..e9706ba199320 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala @@ -101,6 +101,7 @@ private[spark] object Constants { val KERBEROS_SPARK_USER_NAME = "spark.kubernetes.kerberos.spark-user-name" val KERBEROS_SECRET_KEY = "hadoop-tokens" + val SECRET_DATA_ITEM_PREFIX_TOKENS = "spark.kubernetes.dt-" // Hadoop credentials secrets for the Spark app. val SPARK_APP_HADOOP_CREDENTIALS_BASE_DIR = "/mnt/secrets/hadoop-credentials" diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala index 066547dcbb408..26c898e44df69 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala @@ -78,7 +78,7 @@ private[spark] case class KubernetesConf[T <: KubernetesRoleSpecificConf]( def krbConfigMapName: String = s"$appResourceNamePrefix-krb5-file" def tokenManager(conf: SparkConf, hConf: Configuration): KubernetesHadoopDelegationTokenManager = - new KubernetesHadoopDelegationTokenManager(conf, hConf) + new KubernetesHadoopDelegationTokenManager(conf, hConf, None) def namespace(): String = sparkConf.get(KUBERNETES_NAMESPACE) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/security/KubernetesHadoopDelegationTokenManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/security/KubernetesHadoopDelegationTokenManager.scala index 3e98d5811d83f..c5c584a3d7a62 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/security/KubernetesHadoopDelegationTokenManager.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/security/KubernetesHadoopDelegationTokenManager.scala @@ -17,21 +17,110 @@ package org.apache.spark.deploy.k8s.security +import java.io.{ByteArrayInputStream, DataInputStream} +import java.io.File + +import scala.collection.JavaConverters._ + +import io.fabric8.kubernetes.api.model.Secret +import io.fabric8.kubernetes.client.{KubernetesClientException, Watch, Watcher} +import io.fabric8.kubernetes.client.KubernetesClient +import io.fabric8.kubernetes.client.Watcher.Action +import org.apache.commons.codec.binary.Base64 import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.security.UserGroupInformation +import org.apache.hadoop.security.{Credentials, UserGroupInformation} import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants.SECRET_DATA_ITEM_PREFIX_TOKENS import org.apache.spark.deploy.security.HadoopDelegationTokenManager +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens /** * Adds Kubernetes-specific functionality to HadoopDelegationTokenManager. */ private[spark] class KubernetesHadoopDelegationTokenManager( _sparkConf: SparkConf, - _hadoopConf: Configuration) + _hadoopConf: Configuration, + kubernetesClient: Option[KubernetesClient]) extends HadoopDelegationTokenManager(_sparkConf, _hadoopConf) { def getCurrentUser: UserGroupInformation = UserGroupInformation.getCurrentUser def isSecurityEnabled: Boolean = UserGroupInformation.isSecurityEnabled + if (principal != null) { + require(keytab != null, "Kerberos principal specified without a keytab.") + require(new File(keytab).isFile, s"Cannot find keytab at $keytab.") + } + private val isTokenRenewalEnabled = + _sparkConf.get(KUBERNETES_KERBEROS_DT_SECRET_RENEWAL) + + private val dtSecretName = _sparkConf.get(KUBERNETES_KERBEROS_DT_SECRET_NAME) + if (isTokenRenewalEnabled) { + require(dtSecretName.isDefined, + "Must specify the token secret which the driver must watch for updates") + } + + private def deserialize(credentials: Credentials, data: Array[Byte]): Unit = { + val byteStream = new ByteArrayInputStream(data) + val dataStream = new DataInputStream(byteStream) + credentials.readTokenStorageStream(dataStream) + } + + private var watch: Watch = _ + + /** + * As in HadoopDelegationTokenManager this starts the token renewer. + * Upon start, if a principal has been configured, the renewer will: + * + * - log in the configured principal, and set up a task to keep that user's ticket renewed + * - obtain delegation tokens from all available providers + * - schedule a periodic task to update the tokens when needed. + * + * In the case that the principal is NOT configured, one may still service a long running + * app by enabling the KERBEROS_SECRET_RENEWER config and relying on an external service + * to populate a secret with valid Delegation Tokens that the application will then use. + * This is possibly via the use of a Secret watcher which the driver will leverage to + * detect updates that happen to the secret so that it may retrieve that secret's contents + * and send it to all expiring executors + * + * @param driver If provided, the driver where to send the newly generated tokens. + * The same ref will also receive future token updates unless overridden later. + * @return The newly logged in user, or null + */ + override def start(driverEndpoint: Option[RpcEndpointRef] = None): UserGroupInformation = { + driverEndpoint.foreach(super.setDriverRef) + val driver = driverRef.get() + if (isTokenRenewalEnabled && + kubernetesClient.isDefined && driverEndpoint.isDefined && driver != null) { + watch = kubernetesClient.get + .secrets() + .withName(dtSecretName.get) + .watch(new Watcher[Secret] { + override def onClose(cause: KubernetesClientException): Unit = + logInfo("Ending the watch of DT Secret") + override def eventReceived(action: Watcher.Action, resource: Secret): Unit = { + action match { + case Action.ADDED | Action.MODIFIED => + logInfo("Secret update") + val dataItems = resource.getData.asScala.filterKeys( + _.startsWith(SECRET_DATA_ITEM_PREFIX_TOKENS)).toSeq.sorted + val latestToken = if (dataItems.nonEmpty) Some(dataItems.max) else None + latestToken.foreach { + case (_, data) => + val credentials = new Credentials + deserialize(credentials, Base64.decodeBase64(data)) + val tokens = SparkHadoopUtil.get.serialize(credentials) + driver.send(UpdateDelegationTokens(tokens)) + } + } + } + }) + null + } else { + super.start(driverEndpoint) + } + } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala index fa6dc2c479bbf..81c9d5667616e 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala @@ -21,7 +21,10 @@ import java.util.concurrent.ExecutorService import io.fabric8.kubernetes.client.KubernetesClient import scala.concurrent.{ExecutionContext, Future} +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.security.KubernetesHadoopDelegationTokenManager +import org.apache.spark.deploy.security.HadoopDelegationTokenManager import org.apache.spark.rpc.{RpcAddress, RpcEnv} import org.apache.spark.scheduler.{ExecutorLossReason, TaskSchedulerImpl} import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, SchedulerBackendUtils} @@ -126,6 +129,12 @@ private[spark] class KubernetesClusterSchedulerBackend( new KubernetesDriverEndpoint(rpcEnv, properties) } + override protected def createTokenManager(): Option[HadoopDelegationTokenManager] = { + Some(new KubernetesHadoopDelegationTokenManager(conf, + SparkHadoopUtil.get.newConfiguration(conf), + kubernetesClient)) + } + private class KubernetesDriverEndpoint(rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)]) extends DriverEndpoint(rpcEnv, sparkProperties) {