Banzai Cloud Logo Close
Home Products Benefits Blog Company Contact

The anatomy of Spark applications on Kubernetes

Author Toader Sebastian

Apache Spark on Kubernetes series:
Introduction to Spark on Kubernetes
Scaling Spark made simple on Kubernetes
The anatomy of Spark applications on Kubernetes
Monitoring Apache Spark with Prometheus
Spark History Server on Kubernetes
Spark scheduling on Kubernetes demystified
Spark Streaming Checkpointing on Kubernetes
Deep dive into monitoring Spark and Zeppelin with Prometheus
Apache Spark application resilience on Kubernetes

Apache Zeppelin on Kubernetes series:
Running Zeppelin Spark notebooks on Kubernetes
Running Zeppelin Spark notebooks on Kubernetes - deep dive

Apache Kafka on Kubernetes series:
Kafka on Kubernetes - using etcd

This is the third post in the Spark on Kubernetes series. In this post we take a closer look at how Spark uses k8s as a cluster manager, natively. Just a reminder, we at Banzai Cloud provision Spark, Zeppelin, Kafka and a few other applications to Kubernetes the cloud-native way.

The entry point that triggers and runs Spark applications on k8s is spark-submit, for which only cluster mode is available on k8s. Client mode is not yet supported: see PR-456 In-cluster client mode.

Let’s take a closer look at some of the Spark components on k8s:

  • Spark Driver - this is where the execution of the Spark application starts. It’s responsible for creating actionable tasks from the Spark application it executes, as well as for managing and coordinating executors.
  • Executor - the component responsible for executing a task
  • External Shuffle Service - is used only when dynamic executor scaling is enabled. The external shuffle service is responsible for persisting shuffle files beyond the lifetime of executors, allowing the number of executors to scale up and down without any loss of computation.
  • Resource Staging Server(RSS) - this is used when the compiled code of a Spark application is hosted on the machine from which the spark-submit is issued. It’s also used when the dependecies of a Spark application are located on a local machine. These locally hosted files are made available to the Spark driver and executors inside k8s via this component.

spark-submit 🔗︎

To launch a Spark application in Kubernetes, use:

./bin/spark-submit \
  --class <main-class> \
  --master k8s://https://<k8s-apiserver-host>:<k8s-apiserver-port> \
  --deploy-mode cluster \
  --conf <key>=<value> \
  ... # other options
  <application-jar> \

For the other options supported by spark-submit on k8s, check out the Spark Properties section, here.

The value passed into --master is the master URL for the cluster. This master URL is the basis for the creation of the appropriate cluster manager client. If it is prefixed with k8s, then org.apache.spark.deploy.k8s.submit.Client is instantiated. This class is responsible for assembling the Spark driver’s pod specifications, based on the arguments passed to spark-submit.

It checks to see whether <application-jar> points to a file that is local to the host from which spark-submit was issued. If the file path starts with the protocol file://, or no protocol is specified, then the file is local. This is not to be confused with local://, which means the file is located inside the pod. From here on in, when we say ‘local file’ we mean a file that’s local to the host from which spark-submit was issued. If it’s a local file, then it has to be uploaded to the RSS, so that the driver and executors can download it. The same applies to the files listed under --jars and --files.

The RSS must be deployed to a k8s cluster in advance (refer to Spark-on-k8s for more details on how to deploy Spark RSS), and it must be reachable from both outside a k8s cluster (e.g.: a spark-submit executed from a user’s local machine), and from inside a k8s cluster (e.g.: the driver and executor pods that can be downloaded from it). The external URI of the RSS must be specified through --conf spark.kubernetes.resourceStagingServer.uri=<value>, and the internal one through --conf spark.kubernetes.resourceStagingServer.internal.uri=<value>. If the internal URI is not set, it will fall back to the external one.

The driver and executors download files from the RSS using init containers. Init containers for downloading files from the RSS will only be added to the driver and executor pod specifications if there is at least one file to be downloaded.

org.apache.spark.deploy.k8s.submit.Client uploads the necessary files from the local machine to the RSS through the URL specified by spark.kubernetes.resourceStagingServer.uri, and posts driver pod specifications to the k8s API. It also starts a watch to monitor the status of the driver pod, which it reports to stdout.

By default, spark-submit waits until the Spark application finishes, unless that behaviour is overriden by --conf spark.kubernetes.submission.waitAppCompletion=false.

Spark driver 🔗︎

K8s creates the Spark Driver pod according to its received pod specifications. As mentioned above, if there are files to be downloaded from the RSS, the init container specifications included in the driver pod’s specifications handle it. Files from the RSS are downloaded by default into /var/spark-data/spark-jars and /var/spark-data/spark-files, respectively. Download locations can be changed as desired via spark.kubernetes.mountdependencies.jarsDownloadDir and spark.kubernetes.mountdependencies.filesDownloadDir.

The Spark application is started within the driver pod. SparkContext creates a task scheduler and cluster manager for each Spark application. It identifies the type of cluster manager to be created by looking at the master URL it receives. org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager is used for master URLs that start with k8s://, an external cluster manager derived from org.apache.spark.scheduler.ExternalClusterManager.

Once SparkContext has its cluster manager, it creates a task scheduler and a backend scheduler for the manager by invoking org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager.createSchedulerBackend(sc: SparkContext, masterURL: String, scheduler: TaskScheduler). This returns a org.apache.spark.scheduler.cluster.k8s.KubernetesClusterSchedulerBackend which is a coarse-grained scheduler.

The body of the Spark Application is compiled into stages and tasks, after which tasks are deployed to the executors that execute them.

If dynamic executor scaling is enabled through --conf spark.dynamicAllocation.enabled=true, then external shuffle services are also necessary. In that case, external shuffle services must be deployed to a k8s cluster in advance. Refer to Spark-on-k8s for details on how to deploy external shuffle services to k8s.

This will also require that the following arguments be passed to spark-submit

--conf spark.shuffle.service.enabled=true \
--conf spark.kubernetes.shuffle.namespace=default \
--conf spark.kubernetes.shuffle.labels="<shuffle selector labels>" \

If this is the case, then the backend scheduler org.apache.spark.scheduler.cluster.k8s.KubernetesClusterSchedulerBackend will also track the list of external shuffle services available in the k8s cluster, using org.apache.spark.scheduler.cluster.k8s.KubernetesExternalShuffleManagerImpl. For tracking external shuffle services, it uses a mechanism provided by k8s, called label selectors. This mechanism employs the labels passed into --conf spark.kubernetes.shuffle.labels.

As the tasks to be executed start to pile up in the driver, it will ask the backend scheduler org.apache.spark.scheduler.cluster.k8s.KubernetesClusterSchedulerBackend for executors. The scheduling algorithm is beyond the scope of this blog, and will be covered at a later time.

Similar to the Spark driver, Spark executors are k8s pods. The backend scheduler uses org.apache.spark.scheduler.cluster.k8s.ExecutorPodFactory to build executor pod specifications which then post to the k8s API. If there are files needed by an executor that are slated to be downloaded from the RSS, then the pod specifications will include an init container. (see how init containers work in the Spark driver section above)

If dynamic executor scaling is not enabled, then there will be a fixed number of executors allocated in accordance with what has been passed into --conf spark.executor.instances=<desired executor number>.

This is how the Spark driver engages the cluster manager (k8s) to provide the executors it needs.

Spark executors 🔗︎

Spark executor pods receive the address of the driver through --driver-url $SPARK_DRIVER_URL, the SPARK_DRIVER_URL environment variable having been set in the executor pod specifications. Executors need the address of the driver because they connect to it directly. Since pods are impermanent, they can also be relocated, and k8s discourages connecting directly to pods. The preferred way of connecting to pods is via k8s services. Thus, org.apache.spark.deploy.k8s.submit.Client (see above) not only creates the driver’s pod, but also creates a service through which the driver pod can be accessed. The address of this service is passed into the SPARK_DRIVER_URL environment variable.

As part of their initialization, executors connect to the driver and pull the current config and the address of the external shuffle service that runs on the same node as the executor.

As tasks drain, the backend scheduler org.apache.spark.scheduler.cluster.k8s.KubernetesClusterSchedulerBackend will scale down unused executors by instructing the k8s API to delete them.