Skip to content

Commit 333f28f

Browse files
committed
fixes
1 parent 58be82a commit 333f28f

File tree

11 files changed

+306
-60
lines changed

11 files changed

+306
-60
lines changed

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -356,6 +356,13 @@ private[spark] object Config extends Logging {
356356
.stringConf
357357
.createOptional
358358

359+
val EXECUTOR_POD_CONTROLLER_CLASS =
360+
ConfigBuilder("spark.kubernetes.executor.podController.class")
361+
.doc("Experimental. Specify a class that can handle the creation " +
362+
"and deletion of pods")
363+
.stringConf
364+
.createOptional
365+
359366
val KUBERNETES_DRIVER_LABEL_PREFIX = "spark.kubernetes.driver.label."
360367
val KUBERNETES_DRIVER_ANNOTATION_PREFIX = "spark.kubernetes.driver.annotation."
361368
val KUBERNETES_DRIVER_SECRETS_PREFIX = "spark.kubernetes.driver.secrets."
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.scheduler.cluster.k8s
18+
19+
import java.util
20+
21+
import io.fabric8.kubernetes.api.model.Pod
22+
import io.fabric8.kubernetes.client.KubernetesClient
23+
24+
25+
/**
26+
* Responsible for the creation and deletion of Pods as per the
27+
* request of the ExecutorPodAllocator, ExecutorPodLifecycleManager, and the
28+
* KubernetesClusterSchedulerBackend. The default implementation:
29+
* ExecutorPodControllerImpl communicates directly
30+
* with the KubernetesClient to create Pods. This class can be extended
31+
* to have your communication be done with a unique CRD that satisfies
32+
* your specific SLA and security concerns.
33+
*/
34+
private[spark] trait ExecutorPodController {
35+
36+
def initialize(kubernetesClient: KubernetesClient): Unit
37+
38+
def addPod(pod: Pod): Unit
39+
40+
def commitAndGetTotalAllocated(): Int
41+
42+
def removePod(pod: Pod): Unit
43+
44+
def removePods(pods: util.List[Pod]): Boolean
45+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.scheduler.cluster.k8s
18+
19+
import java.util
20+
21+
import io.fabric8.kubernetes.api.model.Pod
22+
import io.fabric8.kubernetes.client.KubernetesClient
23+
24+
import org.apache.spark.SparkConf
25+
import org.apache.spark.util.Utils
26+
27+
private[spark] class ExecutorPodControllerImpl(
28+
val conf: SparkConf)
29+
extends ExecutorPodController {
30+
31+
private var kubernetesClient: KubernetesClient = _
32+
33+
private var numAdded: Int = _
34+
35+
override def initialize(kClient: KubernetesClient) : Unit = {
36+
kubernetesClient = kClient
37+
numAdded = 0
38+
}
39+
override def addPod(pod: Pod): Unit = {
40+
kubernetesClient.pods().create(pod)
41+
synchronized {
42+
numAdded += 1
43+
}
44+
}
45+
46+
override def commitAndGetTotalAllocated(): Int = {
47+
val totalNumAdded = numAdded
48+
synchronized {
49+
numAdded = 0
50+
}
51+
totalNumAdded
52+
}
53+
54+
override def removePod(pod: Pod): Unit = {
55+
// If deletion failed on a previous try, we can try again if resync informs us the pod
56+
// is still around.
57+
// Delete as best attempt - duplicate deletes will throw an exception but the end state
58+
// of getting rid of the pod is what matters.
59+
Utils.tryLogNonFatalError {
60+
kubernetesClient
61+
.pods()
62+
.withName(pod.getMetadata.getName)
63+
.delete()
64+
}
65+
}
66+
67+
override def removePods(pods: util.List[Pod]): Boolean = {
68+
kubernetesClient.pods().delete(pods)
69+
}
70+
}

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ private[spark] class ExecutorPodsAllocator(
3535
executorBuilder: KubernetesExecutorBuilder,
3636
kubernetesClient: KubernetesClient,
3737
snapshotsStore: ExecutorPodsSnapshotsStore,
38+
executorPodController: ExecutorPodController,
3839
clock: Clock) extends Logging {
3940

4041
private val EXECUTOR_ID_COUNTER = new AtomicLong(0L)
@@ -115,12 +116,12 @@ private[spark] class ExecutorPodsAllocator(
115116
newlyCreatedExecutors --= timedOut
116117
if (shouldDeleteExecutors) {
117118
Utils.tryLogNonFatalError {
118-
kubernetesClient
119+
val pods = kubernetesClient
119120
.pods()
120121
.withLabel(SPARK_APP_ID_LABEL, applicationId)
121122
.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
122123
.withLabelIn(SPARK_EXECUTOR_ID_LABEL, timedOut.toSeq.map(_.toString): _*)
123-
.delete()
124+
executorPodController.removePods(pods.list().getItems)
124125
}
125126
}
126127
}
@@ -170,13 +171,13 @@ private[spark] class ExecutorPodsAllocator(
170171
if (toDelete.nonEmpty) {
171172
logInfo(s"Deleting ${toDelete.size} excess pod requests (${toDelete.mkString(",")}).")
172173
Utils.tryLogNonFatalError {
173-
kubernetesClient
174+
val pods = kubernetesClient
174175
.pods()
175176
.withField("status.phase", "Pending")
176177
.withLabel(SPARK_APP_ID_LABEL, applicationId)
177178
.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
178179
.withLabelIn(SPARK_EXECUTOR_ID_LABEL, toDelete.sorted.map(_.toString): _*)
179-
.delete()
180+
executorPodController.removePods(pods.list().getItems)
180181
newlyCreatedExecutors --= toDelete
181182
knownPendingCount -= knownPendingToDelete.size
182183
}
@@ -203,7 +204,7 @@ private[spark] class ExecutorPodsAllocator(
203204
.addToContainers(executorPod.container)
204205
.endSpec()
205206
.build()
206-
kubernetesClient.pods().create(podWithAttachedContainer)
207+
executorPodController.addPod(podWithAttachedContainer)
207208
newlyCreatedExecutors(newExecutorId) = clock.getTimeMillis()
208209
logDebug(s"Requested executor with id $newExecutorId from Kubernetes.")
209210
}

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,12 @@ import org.apache.spark.deploy.k8s.Config._
2727
import org.apache.spark.deploy.k8s.KubernetesUtils._
2828
import org.apache.spark.internal.Logging
2929
import org.apache.spark.scheduler.ExecutorExited
30-
import org.apache.spark.util.Utils
3130

3231
private[spark] class ExecutorPodsLifecycleManager(
3332
val conf: SparkConf,
3433
kubernetesClient: KubernetesClient,
3534
snapshotsStore: ExecutorPodsSnapshotsStore,
35+
executorPodController: ExecutorPodController,
3636
// Use a best-effort to track which executors have been removed already. It's not generally
3737
// job-breaking if we remove executors more than once but it's ideal if we make an attempt
3838
// to avoid doing so. Expire cache entries so that this data structure doesn't grow beyond
@@ -125,16 +125,7 @@ private[spark] class ExecutorPodsLifecycleManager(
125125
}
126126

127127
private def removeExecutorFromK8s(updatedPod: Pod): Unit = {
128-
// If deletion failed on a previous try, we can try again if resync informs us the pod
129-
// is still around.
130-
// Delete as best attempt - duplicate deletes will throw an exception but the end state
131-
// of getting rid of the pod is what matters.
132-
Utils.tryLogNonFatalError {
133-
kubernetesClient
134-
.pods()
135-
.withName(updatedPod.getMetadata.getName)
136-
.delete()
137-
}
128+
executorPodController.removePod(updatedPod)
138129
}
139130

140131
private def removeExecutorFromSpark(

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,13 @@ import java.util.concurrent.TimeUnit
2222
import com.google.common.cache.CacheBuilder
2323
import io.fabric8.kubernetes.client.Config
2424

25-
import org.apache.spark.SparkContext
25+
import org.apache.spark.{SparkContext, SparkException}
2626
import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesUtils, SparkKubernetesClientFactory}
2727
import org.apache.spark.deploy.k8s.Config._
2828
import org.apache.spark.deploy.k8s.Constants._
2929
import org.apache.spark.internal.Logging
3030
import org.apache.spark.scheduler.{ExternalClusterManager, SchedulerBackend, TaskScheduler, TaskSchedulerImpl}
31-
import org.apache.spark.util.{SystemClock, ThreadUtils}
31+
import org.apache.spark.util.{SystemClock, ThreadUtils, Utils}
3232

3333
private[spark] class KubernetesClusterManager extends ExternalClusterManager with Logging {
3434

@@ -98,10 +98,26 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
9898
val removedExecutorsCache = CacheBuilder.newBuilder()
9999
.expireAfterWrite(3, TimeUnit.MINUTES)
100100
.build[java.lang.Long, java.lang.Long]()
101+
102+
val executorPodControllers = sc.conf.get(EXECUTOR_POD_CONTROLLER_CLASS)
103+
.map(clazz => Utils.loadExtensions(
104+
classOf[ExecutorPodController],
105+
Seq(clazz), sc.conf))
106+
.getOrElse(Seq(new ExecutorPodControllerImpl(sc.conf)))
107+
108+
if (executorPodControllers.size > 1) {
109+
throw new SparkException(
110+
s"Multiple executorPodControllers listed: $executorPodControllers")
111+
}
112+
val executorPodController = executorPodControllers.head
113+
114+
executorPodController.initialize(kubernetesClient)
115+
101116
val executorPodsLifecycleEventHandler = new ExecutorPodsLifecycleManager(
102117
sc.conf,
103118
kubernetesClient,
104119
snapshotsStore,
120+
executorPodController,
105121
removedExecutorsCache)
106122

107123
val executorPodsAllocator = new ExecutorPodsAllocator(
@@ -110,6 +126,7 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
110126
new KubernetesExecutorBuilder(),
111127
kubernetesClient,
112128
snapshotsStore,
129+
executorPodController,
113130
new SystemClock())
114131

115132
val podsWatchEventSource = new ExecutorPodsWatchSnapshotSource(
@@ -129,6 +146,7 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
129146
snapshotsStore,
130147
executorPodsAllocator,
131148
executorPodsLifecycleEventHandler,
149+
executorPodController,
132150
podsWatchEventSource,
133151
podsPollingEventSource)
134152
}

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
4040
snapshotsStore: ExecutorPodsSnapshotsStore,
4141
podAllocator: ExecutorPodsAllocator,
4242
lifecycleEventHandler: ExecutorPodsLifecycleManager,
43+
executorPodController: ExecutorPodController,
4344
watchEvents: ExecutorPodsWatchSnapshotSource,
4445
pollEvents: ExecutorPodsPollingSnapshotSource)
4546
extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) {
@@ -99,11 +100,11 @@ private[spark] class KubernetesClusterSchedulerBackend(
99100

100101
if (shouldDeleteExecutors) {
101102
Utils.tryLogNonFatalError {
102-
kubernetesClient
103+
val pods = kubernetesClient
103104
.pods()
104105
.withLabel(SPARK_APP_ID_LABEL, applicationId())
105106
.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
106-
.delete()
107+
executorPodController.removePods(pods.list().getItems)
107108
}
108109
}
109110

@@ -146,11 +147,13 @@ private[spark] class KubernetesClusterSchedulerBackend(
146147
.withLabel(SPARK_APP_ID_LABEL, applicationId())
147148
.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
148149
.withLabelIn(SPARK_EXECUTOR_ID_LABEL, executorIds: _*)
150+
.list()
151+
.getItems()
149152

150-
if (!running.list().getItems().isEmpty()) {
151-
logInfo(s"Forcefully deleting ${running.list().getItems().size()} pods " +
153+
if (!running.isEmpty()) {
154+
logInfo(s"Forcefully deleting ${running.size()} pods " +
152155
s"(out of ${executorIds.size}) that are still running after graceful shutdown period.")
153-
running.delete()
156+
executorPodController.removePods(running)
154157
}
155158
}
156159
}

0 commit comments

Comments
 (0)