Banzai Cloud is now part of Cisco

Banzai Cloud Logo Close
Home Products Benefits Blog Company Contact

Spark scheduling on Kubernetes demystified

Author Toader Sebastian

The content of this page hasn't been updated for years and might refer to discontinued products and projects.

Apache Spark on Kubernetes series:
Introduction to Spark on Kubernetes
Scaling Spark made simple on Kubernetes
The anatomy of Spark applications on Kubernetes
Monitoring Apache Spark with Prometheus
Spark History Server on Kubernetes
Spark scheduling on Kubernetes demystified
Spark Streaming Checkpointing on Kubernetes
Deep dive into monitoring Spark and Zeppelin with Prometheus
Apache Spark application resilience on Kubernetes

Apache Zeppelin on Kubernetes series:
Running Zeppelin Spark notebooks on Kubernetes
Running Zeppelin Spark notebooks on Kubernetes - deep dive

Apache Kafka on Kubernetes series:
Kafka on Kubernetes - using etcd

One of the most popular spotguides for our open source, next generation CloudFoundry/Heroku-like Pipeline PaaS is for Apache Spark. In the last few weeks, we’ve been evangelizing for Spark on Kubernetes, and the feedback we’ve received has been outstanding. Thank you for pointing out issues, bringing us ideas and making feature requests. Keep them coming! In this post we’ll be picking up where we left off, and digging into the details of how the Spark scheduler on Kubernetes works.

This blog complements our anatomy of Spark applications on Kubernetes blogpost, and focuses on how executor scheduling is linked to Kubernetes.

The scheduler that is shipped with Spark on Kubernetes is org.apache.spark.scheduler.cluster.k8s.KubernetesClusterSchedulerBackend, which is a coarse-grained scheduler (derived from org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend).

Just to recap our anatomy of Spark applications on Kubernetes blogpost and how to initiate the Spark scheduler:

Once SparkContext has the cluster manager it creates a task scheduler and a backend scheduler by invoking org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager.createSchedulerBackend(sc: SparkContext, masterURL: String, scheduler: TaskScheduler).

  • The scheduler keeps track of the expected total of executors.
  • At the same time, the scheduler keeps track of the number of executor pods already requested from Kubernetes.
  • It also tracks how many executors are actually registered with the driver. Note, this differs from the number of requested executor pods due to the fact that pods can be in a pending state and not actually operational. Only executors that registered with the driver will receive tasks to execute.
  • The scheduler runs with a periodicity of spark.kubernetes.allocation.batch.delay (if unspecified this defaults to 1 second).

Initially both the number of requested executor pods and the number of registered executors is (0) less than that of the total expected executors. Thus, the scheduler will ask Kubernetes to allocate(schedule) executor pods. It doesn’t immediately request the total expected number of executor pods from Kubernetes, but requests the pods in batches of spark.kubernetes.allocation.batch.size (if unspecified this defaults to 5).

In the next scheduling cycle the total number of registered executors is compared to the total number of requested executors. If there are less registered executors than requested executors it means that not all requested executors have been created yet by Kubernetes, thus the scheduler won’t request new executors from Kubernetes.

    if total_registered_executos < total_requested_executors {
        // don't request new executors as there are still pending executor pods
    }

If all executor pods requested so far were created by Kubernetes, and the executors registered with the driver, the scheduler checks whether the total number of expected executors has been reached.

    if total_registered_executos < total_requested_executors {
        // there are still pending executor pods to be created by Kubernetes,
        // don't request new executor pods
    } if total_expected_executors <= total_registered_executos {
        // maximum allowed executor limit reached. Not scaling up further.
    }

If there are no pending executors and the total expected executor count hasn’t been reached, the scheduler requests the next batch of executor pods.

    if total_registered_executors < total_requested_executors {
        // there are still pending executor pods to be created by Kubernetes
        // don't request new executor pods
    } else if total_expected_executors <= total_registered_executors {
        // maximum allowed executor limit reached. Not scaling up further.
    } else {
        int to_be_requested_num = min(total_expected_executors - total_registered_executors, allocation_batch_size

        for (i = 0; i < to_be_requested_num; i++) {
             request_new_executor_from_kubernetes()
             total_requested_executors++
        }
    }

To recap, this is how a Spark application submisson works behind the scenes:

Spark on K8S

Fixed number of executors

To run a Spark job on a fixed number of spark executors, you will have to --conf spark.dynamicAllocation.enabled=false (if this config is not passed to spark-submit then it defaults to false) and --conf spark.executor.instances=<number of executors> (which if unspecified defaults to 1) to spark-submit.

In this case the total number of expected executors is the value passed into spark.executor.instances. This drives the executor pod allocation logic presented above.

Dynamic executor allocation

Dynamic executor allocation can be enabled by passing --conf spark.dynamicAllocation.enabled=true to spark-submit. If done, the scheduler dynamically scales the number of executor pods to meet its needs.

The initial number of executors is derived from:

  • spark.dynamicAllocation.minExecutors (defaults to 0 if not specified)
  • spark.dynamicAllocation.initialExecutors (defaults to spark.dynamicAllocation.minExecutors if not specified)
  • spark.executor.instances (defaults to 0 if not specified)

by taking the maximum of the above three values. The initial number of executors must be less than or equal to spark.dynamicAllocation.maxExecutors.

An executor allocation manager is only instantiated (org.apache.spark.ExecutorAllocationManager) when dynamic allocation is enabled, which is responsible for dynamically computing the total number of expected executors. The executor allocation manager runs every 100 ms in order to actualize the number of expected executors. It initially sets the total number to initial number of executors (see above). After that, it continuously updates the number of expected executors to whatever is necessary for the current load to satisfy all running and pending tasks.

Simultaneously and continously, the KubernetesClusterSchedulerBackend handles pod creation according to the logic described above.

After that, it won’t do anything until there are backlogged tasks for at least spark.dynamicAllocation.schedulerBacklogTimeout seconds (whose default is 1), before it requests more executors, since the current total expected number of executors is not enough to handle the load. Initially, it will request 1 more executor by increasing the number of total expected executors.

Going forward, it will check if the backlog persists for the duration of spark.dynamicAllocation.sustainedSchedulerBacklogTimeout(by default it’s the same as spark.dynamicAllocation.schedulerBacklogTimeout) before requesting additional executors. The executor allocation manager will request double what it requested in its last iteration (e.g. 2, 4, 8 … executors), until the number of expected executors reaches the number of executors needed under the current load.

As tasks are completed, executors won’t have anything to do and will go idle. The allocation manager downscales executors that have been idle for at least spark.dynamicAllocation.executorIdleTimeout seconds (whose default is 60 if unspecified). It passes the ids of executors that are to be downscaled to KubernetesClusterSchedulerBackend, which will engage Kubernetes to delete the corresponding executor pods.