From 3d788842ad6820a74429ebde7b45bc3d01334152 Mon Sep 17 00:00:00 2001 From: Ilan Filonenko Date: Thu, 25 Oct 2018 18:10:58 -0700 Subject: [PATCH 1/2] WIP for token renewal --- .../HadoopDelegationTokenManager.scala | 12 ++-- .../org/apache/spark/deploy/k8s/Config.scala | 8 +++ ...bernetesHadoopDelegationTokenManager.scala | 66 ++++++++++++++++++- 3 files changed, 79 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala index 8f78649287749..3477797d3fb23 100644 --- a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala +++ b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala @@ -70,8 +70,8 @@ private[spark] class HadoopDelegationTokenManager( "spark.yarn.security.credentials.%s.enabled") private val providerEnabledConfig = "spark.security.credentials.%s.enabled" - private val principal = sparkConf.get(PRINCIPAL).orNull - private val keytab = sparkConf.get(KEYTAB).orNull + protected val principal = sparkConf.get(PRINCIPAL).orNull + protected val keytab = sparkConf.get(KEYTAB).orNull if (principal != null) { require(keytab != null, "Kerberos principal specified without a keytab.") @@ -82,8 +82,8 @@ private[spark] class HadoopDelegationTokenManager( logDebug("Using the following builtin delegation token providers: " + s"${delegationTokenProviders.keys.mkString(", ")}.") - private var renewalExecutor: ScheduledExecutorService = _ - private val driverRef = new AtomicReference[RpcEndpointRef]() + protected var renewalExecutor: ScheduledExecutorService = _ + protected val driverRef = new AtomicReference[RpcEndpointRef]() protected def setDriverRef(ref: RpcEndpointRef): Unit = { driverRef.set(ref) @@ -262,7 +262,7 @@ private[spark] class HadoopDelegationTokenManager( * * @return Credentials containing the new tokens. */ - private def obtainTokensAndScheduleRenewal(ugi: UserGroupInformation): Credentials = { + protected def obtainTokensAndScheduleRenewal(ugi: UserGroupInformation): Credentials = { ugi.doAs(new PrivilegedExceptionAction[Credentials]() { override def run(): Credentials = { val creds = new Credentials() @@ -280,7 +280,7 @@ private[spark] class HadoopDelegationTokenManager( }) } - private def doLogin(): UserGroupInformation = { + protected def doLogin(): UserGroupInformation = { logInfo(s"Attempting to login to KDC using principal: $principal") val ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab) logInfo("Successfully logged into KDC.") diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index c2ad80c4755a6..7f849a5106dd0 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -262,6 +262,14 @@ private[spark] object Config extends Logging { .stringConf .createOptional + val KUBERNETES_KERBEROS_DT_SECRET_RENEWAL = + ConfigBuilder("spark.kubernetes.kerberos.tokenSecret.renewal") + .doc("Enabling the driver to watch the secret specified at " + + "spark.kubernetes.kerberos.tokenSecret.name for updates so that the " + + "tokens can be propogated to the executors.") + .booleanConf + .createWithDefault(false) + val APP_RESOURCE_TYPE = ConfigBuilder("spark.kubernetes.resource.type") .doc("This sets the resource type internally") diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/security/KubernetesHadoopDelegationTokenManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/security/KubernetesHadoopDelegationTokenManager.scala index 3e98d5811d83f..f66be4d8de1c8 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/security/KubernetesHadoopDelegationTokenManager.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/security/KubernetesHadoopDelegationTokenManager.scala @@ -17,11 +17,23 @@ package org.apache.spark.deploy.k8s.security +import java.io.File +import scala.collection.JavaConverters._ + +import io.fabric8.kubernetes.api.model.Secret +import io.fabric8.kubernetes.client.{KubernetesClientException, Watch, Watcher} +import io.fabric8.kubernetes.client.Watcher.Action import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.security.UserGroupInformation +import org.apache.hadoop.security.{Credentials, UserGroupInformation} import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.security.HadoopDelegationTokenManager +import org.apache.spark.internal.config.KERBEROS_RELOGIN_PERIOD +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens +import org.apache.spark.util.ThreadUtils /** * Adds Kubernetes-specific functionality to HadoopDelegationTokenManager. @@ -34,4 +46,56 @@ private[spark] class KubernetesHadoopDelegationTokenManager( def getCurrentUser: UserGroupInformation = UserGroupInformation.getCurrentUser def isSecurityEnabled: Boolean = UserGroupInformation.isSecurityEnabled + if (principal != null) { + require(keytab != null, "Kerberos principal specified without a keytab.") + require(new File(keytab).isFile, s"Cannot find keytab at $keytab.") + } + private val isTokenRenewalEnabled = + _sparkConf.get(KUBERNETES_KERBEROS_DT_SECRET_RENEWAL) + + if (isTokenRenewalEnabled) { + require(_sparkConf.get(KUBERNETES_KERBEROS_DT_SECRET_NAME).isDefined, + "Must specify the token secret which the driver must watch for updates") + } + + private var watcher: Watcher[Secret] = _ + + /** + * As in HadoopDelegationTokenManager this starts the token renewer. + * Upon start, if a principal has been configured, the renewer will: + * + * - log in the configured principal, and set up a task to keep that user's ticket renewed + * - obtain delegation tokens from all available providers + * - schedule a periodic task to update the tokens when needed. + * + * In the case that the principal is NOT configured, one may still service a long running + * app by enabling the KERBEROS_SECRET_RENEWER config and relying on an external service + * to populate a secret with valid Delegation Tokens that the application will then use. + * This is possibly via the use of a Secret watcher which the driver will leverage to + * detect updates that happen to the secret so that it may retrieve that secret's contents + * and send it to all expiring executors + * + * @param driver If provided, the driver where to send the newly generated tokens. + * The same ref will also receive future token updates unless overridden later. + * @return The newly logged in user, or null + */ + override def start(driver: Option[RpcEndpointRef] = None): UserGroupInformation = { + if (isTokenRenewalEnabled) { + watcher = new Watcher[Secret] { + override def onClose(cause: KubernetesClientException): Unit = + logInfo("Ending the watch of DT Secret") + + override def eventReceived(action: Watcher.Action, resource: Secret): Unit = { + action match { + case Action.ADDED | Action.MODIFIED => + logInfo("Secret update") + // TODO: Figure out what to do with secret here + } + } + } + null + } else { + super.start(driver) + } + } } From 9ad06b6c045e109f30c22d2d2e70881dc68bef69 Mon Sep 17 00:00:00 2001 From: Ilan Filonenko Date: Fri, 26 Oct 2018 12:41:21 -0700 Subject: [PATCH 2/2] enable token renewal --- .../HadoopDelegationTokenManager.scala | 1 - .../CoarseGrainedSchedulerBackend.scala | 1 - .../apache/spark/deploy/k8s/Constants.scala | 1 + .../spark/deploy/k8s/KubernetesConf.scala | 2 +- ...bernetesHadoopDelegationTokenManager.scala | 45 ++++++++++++++----- .../KubernetesClusterSchedulerBackend.scala | 9 ++++ 6 files changed, 46 insertions(+), 13 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala index 3477797d3fb23..35c014dca324d 100644 --- a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala +++ b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala @@ -107,7 +107,6 @@ private[spark] class HadoopDelegationTokenManager( */ def start(driver: Option[RpcEndpointRef] = None): UserGroupInformation = { driver.foreach(setDriverRef) - if (principal != null) { renewalExecutor = ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Renewal Thread") diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index df72eec102cc4..5247eb613af1a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -706,7 +706,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp * fully constructed), only if security is enabled in the Hadoop configuration. */ protected def createTokenManager(): Option[HadoopDelegationTokenManager] = None - } private[spark] object CoarseGrainedSchedulerBackend { diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala index 172a9054bb4f2..e9706ba199320 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala @@ -101,6 +101,7 @@ private[spark] object Constants { val KERBEROS_SPARK_USER_NAME = "spark.kubernetes.kerberos.spark-user-name" val KERBEROS_SECRET_KEY = "hadoop-tokens" + val SECRET_DATA_ITEM_PREFIX_TOKENS = "spark.kubernetes.dt-" // Hadoop credentials secrets for the Spark app. val SPARK_APP_HADOOP_CREDENTIALS_BASE_DIR = "/mnt/secrets/hadoop-credentials" diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala index 066547dcbb408..26c898e44df69 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala @@ -78,7 +78,7 @@ private[spark] case class KubernetesConf[T <: KubernetesRoleSpecificConf]( def krbConfigMapName: String = s"$appResourceNamePrefix-krb5-file" def tokenManager(conf: SparkConf, hConf: Configuration): KubernetesHadoopDelegationTokenManager = - new KubernetesHadoopDelegationTokenManager(conf, hConf) + new KubernetesHadoopDelegationTokenManager(conf, hConf, None) def namespace(): String = sparkConf.get(KUBERNETES_NAMESPACE) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/security/KubernetesHadoopDelegationTokenManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/security/KubernetesHadoopDelegationTokenManager.scala index f66be4d8de1c8..93d988c418e1b 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/security/KubernetesHadoopDelegationTokenManager.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/security/KubernetesHadoopDelegationTokenManager.scala @@ -17,30 +17,34 @@ package org.apache.spark.deploy.k8s.security +import java.io.{ByteArrayInputStream, DataInputStream} import java.io.File + import scala.collection.JavaConverters._ import io.fabric8.kubernetes.api.model.Secret import io.fabric8.kubernetes.client.{KubernetesClientException, Watch, Watcher} +import io.fabric8.kubernetes.client.KubernetesClient import io.fabric8.kubernetes.client.Watcher.Action +import org.apache.commons.codec.binary.Base64 import org.apache.hadoop.conf.Configuration import org.apache.hadoop.security.{Credentials, UserGroupInformation} import org.apache.spark.SparkConf import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants.SECRET_DATA_ITEM_PREFIX_TOKENS import org.apache.spark.deploy.security.HadoopDelegationTokenManager -import org.apache.spark.internal.config.KERBEROS_RELOGIN_PERIOD import org.apache.spark.rpc.RpcEndpointRef import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens -import org.apache.spark.util.ThreadUtils /** * Adds Kubernetes-specific functionality to HadoopDelegationTokenManager. */ private[spark] class KubernetesHadoopDelegationTokenManager( _sparkConf: SparkConf, - _hadoopConf: Configuration) + _hadoopConf: Configuration, + kubernetesClient: Option[KubernetesClient]) extends HadoopDelegationTokenManager(_sparkConf, _hadoopConf) { def getCurrentUser: UserGroupInformation = UserGroupInformation.getCurrentUser @@ -53,12 +57,19 @@ private[spark] class KubernetesHadoopDelegationTokenManager( private val isTokenRenewalEnabled = _sparkConf.get(KUBERNETES_KERBEROS_DT_SECRET_RENEWAL) + private val dtSecretName = _sparkConf.get(KUBERNETES_KERBEROS_DT_SECRET_NAME) if (isTokenRenewalEnabled) { - require(_sparkConf.get(KUBERNETES_KERBEROS_DT_SECRET_NAME).isDefined, + require(dtSecretName.isDefined, "Must specify the token secret which the driver must watch for updates") } - private var watcher: Watcher[Secret] = _ + private def deserialize(credentials: Credentials, data: Array[Byte]): Unit = { + val byteStream = new ByteArrayInputStream(data) + val dataStream = new DataInputStream(byteStream) + credentials.readTokenStorageStream(dataStream) + } + + private var watch: Watch = _ /** * As in HadoopDelegationTokenManager this starts the token renewer. @@ -80,19 +91,33 @@ private[spark] class KubernetesHadoopDelegationTokenManager( * @return The newly logged in user, or null */ override def start(driver: Option[RpcEndpointRef] = None): UserGroupInformation = { - if (isTokenRenewalEnabled) { - watcher = new Watcher[Secret] { + driver.foreach(super.setDriverRef) + val driverOpt = driverRef.get() + if (isTokenRenewalEnabled && + kubernetesClient.isDefined && driver.isDefined && driverOpt != null) { + watch = kubernetesClient.get + .secrets() + .withName(dtSecretName.get) + .watch(new Watcher[Secret] { override def onClose(cause: KubernetesClientException): Unit = logInfo("Ending the watch of DT Secret") - override def eventReceived(action: Watcher.Action, resource: Secret): Unit = { action match { case Action.ADDED | Action.MODIFIED => logInfo("Secret update") - // TODO: Figure out what to do with secret here + val dataItems = resource.getData.asScala.filterKeys( + _.startsWith(SECRET_DATA_ITEM_PREFIX_TOKENS)).toSeq.sorted + val latestToken = if (dataItems.nonEmpty) Some(dataItems.max) else None + latestToken.foreach { + case (_, data) => + val credentials = new Credentials + deserialize(credentials, Base64.decodeBase64(data)) + val tokens = SparkHadoopUtil.get.serialize(credentials) + driverOpt.send(UpdateDelegationTokens(tokens)) + } } } - } + }) null } else { super.start(driver) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala index fa6dc2c479bbf..81c9d5667616e 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala @@ -21,7 +21,10 @@ import java.util.concurrent.ExecutorService import io.fabric8.kubernetes.client.KubernetesClient import scala.concurrent.{ExecutionContext, Future} +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.security.KubernetesHadoopDelegationTokenManager +import org.apache.spark.deploy.security.HadoopDelegationTokenManager import org.apache.spark.rpc.{RpcAddress, RpcEnv} import org.apache.spark.scheduler.{ExecutorLossReason, TaskSchedulerImpl} import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, SchedulerBackendUtils} @@ -126,6 +129,12 @@ private[spark] class KubernetesClusterSchedulerBackend( new KubernetesDriverEndpoint(rpcEnv, properties) } + override protected def createTokenManager(): Option[HadoopDelegationTokenManager] = { + Some(new KubernetesHadoopDelegationTokenManager(conf, + SparkHadoopUtil.get.newConfiguration(conf), + kubernetesClient)) + } + private class KubernetesDriverEndpoint(rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)]) extends DriverEndpoint(rpcEnv, sparkProperties) {