-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-24434][K8S] pod template files #22146
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 19 commits
e2e7223
ea4dde6
4f088db
f2f9a44
368d0a4
0005ea5
dda5cc9
d0f41aa
c4c1231
74de0e5
205ddd3
4ae6fc6
c0bcfea
b9e4263
c5e1ea0
56a6b32
7d0d928
1da79a8
8ef756e
7f3cb04
cc8d3f8
4119899
1d0a8fa
81e5a66
7f4ff5a
3097aef
9b1418a
ebacc96
98acd29
95f8b8b
da5dff5
7fb76c7
d86bc75
f2720a5
4b3950d
3813fcb
ec04323
f3b6082
fd503db
eeb2492
4801e8e
36a70ad
ece7a7c
8b8aa48
1ed95ab
a4fde0c
140e89c
838c2bd
5faea62
9e6a4b2
c8077dc
3d6ff3b
83087eb
a46b885
80b56c1
8f7f571
3707e6a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -74,8 +74,15 @@ private[spark] object Constants { | |
| val ENV_R_PRIMARY = "R_PRIMARY" | ||
| val ENV_R_ARGS = "R_APP_ARGS" | ||
|
|
||
| // Pod spec templates | ||
| val EXECUTOR_POD_SPEC_TEMPLATE_FILE_NAME = "podSpecTemplate.yml" | ||
| val EXECUTOR_POD_SPEC_TEMPLATE_FILE = | ||
| s"$SPARK_CONF_DIR_INTERNAL/$EXECUTOR_POD_SPEC_TEMPLATE_FILE_NAME" | ||
| val POD_TEMPLATE_VOLUME = "podspec-volume" | ||
|
||
|
|
||
| // Miscellaneous | ||
| val KUBERNETES_MASTER_INTERNAL_URL = "https://kubernetes.default.svc" | ||
| val DRIVER_CONTAINER_NAME = "spark-kubernetes-driver" | ||
| val EXECUTOR_CONTAINER_NAME = "executor" | ||
| val MEMORY_OVERHEAD_MIN_MIB = 384L | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -24,8 +24,9 @@ private[spark] case class KubernetesDriverSpec( | |
| systemProperties: Map[String, String]) | ||
|
|
||
| private[spark] object KubernetesDriverSpec { | ||
| def initialSpec(initialProps: Map[String, String]): KubernetesDriverSpec = KubernetesDriverSpec( | ||
| SparkPod.initialPod(), | ||
| Seq.empty, | ||
| initialProps) | ||
| def initialSpec(initialConf: KubernetesConf[KubernetesDriverSpecificConf]): KubernetesDriverSpec = | ||
| KubernetesDriverSpec( | ||
| SparkPod.initialPod(), | ||
| Seq.empty, | ||
|
||
| initialConf.sparkConf.getAll.toMap) | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -16,10 +16,16 @@ | |
| */ | ||
| package org.apache.spark.deploy.k8s | ||
|
|
||
| import org.apache.spark.SparkConf | ||
| import java.io.File | ||
|
|
||
| import io.fabric8.kubernetes.client.KubernetesClient | ||
| import scala.collection.JavaConverters._ | ||
|
|
||
| import org.apache.spark.{SparkConf, SparkException} | ||
| import org.apache.spark.internal.Logging | ||
| import org.apache.spark.util.Utils | ||
|
|
||
| private[spark] object KubernetesUtils { | ||
| private[spark] object KubernetesUtils extends Logging { | ||
|
|
||
| /** | ||
| * Extract and parse Spark configuration properties with a given name prefix and | ||
|
|
@@ -59,5 +65,23 @@ private[spark] object KubernetesUtils { | |
| } | ||
| } | ||
|
|
||
| def loadPodFromTemplate(kubernetesClient: KubernetesClient, | ||
|
||
| templateFile: File, | ||
| containerName: String): SparkPod = { | ||
| try { | ||
| val pod = kubernetesClient.pods().load(templateFile).get() | ||
| val container = pod.getSpec.getContainers.asScala | ||
| .filter(_.getName == containerName) | ||
|
||
| .headOption | ||
| require(container.isDefined) | ||
| SparkPod(pod, container.get) | ||
| } catch { | ||
| case e: Exception => | ||
| logError( | ||
| s"Encountered exception while attempting to load initial pod spec from file", e) | ||
| throw new SparkException("Could not load driver pod from template file.", e) | ||
|
||
| } | ||
| } | ||
|
|
||
| def parseMasterUrl(url: String): String = url.substring("k8s://".length) | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,51 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package org.apache.spark.deploy.k8s.features | ||
|
|
||
| import io.fabric8.kubernetes.api.model.{Config => _, _} | ||
|
|
||
| import org.apache.spark.deploy.k8s._ | ||
|
|
||
| private[spark] class TemplateVolumeStep( | ||
|
||
| conf: KubernetesConf[_ <: KubernetesRoleSpecificConf]) | ||
| extends KubernetesFeatureConfigStep { | ||
| def configurePod(pod: SparkPod): SparkPod = { | ||
| require(conf.get(Config.KUBERNETES_EXECUTOR_PODTEMPLATE_FILE).isDefined) | ||
| val podTemplateFile = conf.get(Config.KUBERNETES_EXECUTOR_PODTEMPLATE_FILE).get | ||
| val podWithVolume = new PodBuilder(pod.pod) | ||
| .editSpec() | ||
|
||
| .addNewVolume() | ||
| .withName(Constants.POD_TEMPLATE_VOLUME) | ||
| .withHostPath(new HostPathVolumeSource(podTemplateFile)) | ||
|
||
| .endVolume() | ||
| .endSpec() | ||
| .build() | ||
|
|
||
| val containerWithVolume = new ContainerBuilder(pod.container) | ||
| .withVolumeMounts(new VolumeMountBuilder() | ||
|
||
| .withName(Constants.POD_TEMPLATE_VOLUME) | ||
| .withMountPath(Constants.EXECUTOR_POD_SPEC_TEMPLATE_FILE) | ||
| .build()) | ||
| .build() | ||
| SparkPod(podWithVolume, containerWithVolume) | ||
| } | ||
|
|
||
| def getAdditionalPodSystemProperties(): Map[String, String] = Map[String, String]( | ||
| Config.KUBERNETES_EXECUTOR_PODTEMPLATE_FILE.key -> Constants.EXECUTOR_POD_SPEC_TEMPLATE_FILE) | ||
|
|
||
| def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -16,11 +16,17 @@ | |
| */ | ||
| package org.apache.spark.deploy.k8s.submit | ||
|
|
||
| import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpec, KubernetesDriverSpecificConf, KubernetesRoleSpecificConf} | ||
| import org.apache.spark.deploy.k8s.features.{BasicDriverFeatureStep, DriverKubernetesCredentialsFeatureStep, DriverServiceFeatureStep, EnvSecretsFeatureStep, LocalDirsFeatureStep, MountSecretsFeatureStep, MountVolumesFeatureStep} | ||
| import java.io.File | ||
|
|
||
| import io.fabric8.kubernetes.client.KubernetesClient | ||
|
|
||
| import org.apache.spark.{SparkConf, SparkException} | ||
| import org.apache.spark.deploy.k8s._ | ||
| import org.apache.spark.deploy.k8s.features._ | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Undo these import changes. Keep the ordering correct, but import each class individually.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ping here |
||
| import org.apache.spark.deploy.k8s.features.bindings.{JavaDriverFeatureStep, PythonDriverFeatureStep, RDriverFeatureStep} | ||
| import org.apache.spark.internal.Logging | ||
|
|
||
| private[spark] class KubernetesDriverBuilder( | ||
| private[spark] class KubernetesDriverBuilder ( | ||
|
||
| provideBasicStep: (KubernetesConf[KubernetesDriverSpecificConf]) => BasicDriverFeatureStep = | ||
| new BasicDriverFeatureStep(_), | ||
| provideCredentialsStep: (KubernetesConf[KubernetesDriverSpecificConf]) | ||
|
|
@@ -51,7 +57,13 @@ private[spark] class KubernetesDriverBuilder( | |
| provideJavaStep: ( | ||
| KubernetesConf[KubernetesDriverSpecificConf] | ||
| => JavaDriverFeatureStep) = | ||
| new JavaDriverFeatureStep(_)) { | ||
| new JavaDriverFeatureStep(_), | ||
| provideTemplateVolumeStep: (KubernetesConf[_ <: KubernetesRoleSpecificConf] | ||
| => TemplateVolumeStep) = | ||
| new TemplateVolumeStep(_), | ||
| provideInitialSpec: KubernetesConf[KubernetesDriverSpecificConf] | ||
|
||
| => KubernetesDriverSpec = | ||
| KubernetesDriverSpec.initialSpec) { | ||
|
|
||
| def buildFromFeatures( | ||
| kubernetesConf: KubernetesConf[KubernetesDriverSpecificConf]): KubernetesDriverSpec = { | ||
|
|
@@ -70,6 +82,10 @@ private[spark] class KubernetesDriverBuilder( | |
| val volumesFeature = if (kubernetesConf.roleVolumes.nonEmpty) { | ||
| Seq(provideVolumesStep(kubernetesConf)) | ||
| } else Nil | ||
| val templateVolumeFeature = if ( | ||
| kubernetesConf.get(Config.KUBERNETES_EXECUTOR_PODTEMPLATE_FILE).isDefined) { | ||
| Seq(provideTemplateVolumeStep(kubernetesConf)) | ||
| } else Nil | ||
|
|
||
| val bindingsStep = kubernetesConf.roleSpecificConf.mainAppResource.map { | ||
| case JavaMainAppResource(_) => | ||
|
|
@@ -81,9 +97,9 @@ private[spark] class KubernetesDriverBuilder( | |
| .getOrElse(provideJavaStep(kubernetesConf)) | ||
|
|
||
| val allFeatures = (baseFeatures :+ bindingsStep) ++ | ||
| secretFeature ++ envSecretFeature ++ volumesFeature | ||
| secretFeature ++ envSecretFeature ++ volumesFeature ++ templateVolumeFeature | ||
|
||
|
|
||
| var spec = KubernetesDriverSpec.initialSpec(kubernetesConf.sparkConf.getAll.toMap) | ||
| var spec = provideInitialSpec(kubernetesConf) | ||
| for (feature <- allFeatures) { | ||
| val configuredPod = feature.configurePod(spec.pod) | ||
| val addedSystemProperties = feature.getAdditionalPodSystemProperties() | ||
|
|
@@ -96,3 +112,25 @@ private[spark] class KubernetesDriverBuilder( | |
| spec | ||
| } | ||
| } | ||
|
|
||
| private[spark] object KubernetesDriverBuilder extends Logging { | ||
| def apply(kubernetesClient: KubernetesClient, conf: SparkConf): KubernetesDriverBuilder = { | ||
| conf.get(Config.KUBERNETES_DRIVER_PODTEMPLATE_FILE) | ||
| .map(new File(_)) | ||
| .map(file => new KubernetesDriverBuilder(provideInitialSpec = conf => { | ||
| try { | ||
| val sparkPod = KubernetesUtils.loadPodFromTemplate( | ||
| kubernetesClient, | ||
| file, | ||
| Constants.DRIVER_CONTAINER_NAME) | ||
|
||
| KubernetesDriverSpec.initialSpec(conf).copy(pod = sparkPod) | ||
| } catch { | ||
| case e: Exception => | ||
| logError( | ||
| s"Encountered exception while attempting to load initial pod spec from file", e) | ||
| throw new SparkException("Could not load driver pod from template file.", e) | ||
| } | ||
| })) | ||
| .getOrElse(new KubernetesDriverBuilder()) | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -23,7 +23,7 @@ import com.google.common.cache.CacheBuilder | |
| import io.fabric8.kubernetes.client.Config | ||
|
|
||
| import org.apache.spark.SparkContext | ||
| import org.apache.spark.deploy.k8s.{KubernetesUtils, SparkKubernetesClientFactory} | ||
| import org.apache.spark.deploy.k8s.{Constants, KubernetesUtils, SparkKubernetesClientFactory, SparkPod} | ||
| import org.apache.spark.deploy.k8s.Config._ | ||
| import org.apache.spark.deploy.k8s.Constants._ | ||
| import org.apache.spark.internal.Logging | ||
|
|
@@ -69,6 +69,13 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit | |
| defaultServiceAccountToken, | ||
| defaultServiceAccountCaCrt) | ||
|
|
||
| if (sc.conf.get(KUBERNETES_EXECUTOR_PODTEMPLATE_FILE).isDefined) { | ||
| KubernetesUtils.loadPodFromTemplate( | ||
| kubernetesClient, | ||
| new File(sc.conf.get(KUBERNETES_EXECUTOR_PODTEMPLATE_FILE).get), | ||
| Constants.EXECUTOR_CONTAINER_NAME) | ||
| } | ||
|
|
||
| val requestExecutorsService = ThreadUtils.newDaemonCachedThreadPool( | ||
| "kubernetes-executor-requests") | ||
|
|
||
|
|
@@ -81,13 +88,17 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit | |
| .build[java.lang.Long, java.lang.Long]() | ||
| val executorPodsLifecycleEventHandler = new ExecutorPodsLifecycleManager( | ||
| sc.conf, | ||
| new KubernetesExecutorBuilder(), | ||
| KubernetesExecutorBuilder(kubernetesClient, sc.conf), | ||
|
||
| kubernetesClient, | ||
| snapshotsStore, | ||
| removedExecutorsCache) | ||
|
|
||
| val executorPodsAllocator = new ExecutorPodsAllocator( | ||
| sc.conf, new KubernetesExecutorBuilder(), kubernetesClient, snapshotsStore, new SystemClock()) | ||
| sc.conf, | ||
| KubernetesExecutorBuilder(kubernetesClient, sc.conf), | ||
| kubernetesClient, | ||
| snapshotsStore, | ||
| new SystemClock()) | ||
|
|
||
| val podsWatchEventSource = new ExecutorPodsWatchSnapshotSource( | ||
| snapshotsStore, | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we not use camel case for the file name? This looks inconsistent with other conf files.