Apache Spark on Kubernetes series:
Introduction to Spark on Kubernetes
Scaling Spark made simple on Kubernetes
The anatomy of Spark applications on Kubernetes
Monitoring Apache Spark with Prometheus
Spark History Server on Kubernetes
Spark scheduling on Kubernetes demystified
Spark Streaming Checkpointing on Kubernetes
Deep dive into monitoring Spark and Zeppelin with Prometheus
Spark Streaming Checkpointing on Kubernetes
Deep dive into monitoring Spark and Zeppelin with Prometheus
Apache Spark application resilience on Kubernetes
Collecting Spark History Server event logs in the cloud
Apache Zeppelin on Kubernetes series:
Running Zeppelin Spark notebooks on Kubernetes
Running Zeppelin Spark notebooks on Kubernetes - deep dive
Apache Kafka on Kubernetes series:
Kafka on Kubernetes - using etcd
Over the last three months at Banzai Cloud, we’ve been running and supporting Spark deployments on our open source PaaS, Pipeline, which is built on Kubernetes. We’ve been pushing the limits of Spark on Kubernetes and we’ve seen some interesting situations like checkpointing throughput failures, all kinds of pod or infrastructure failures, and, most recently, node failures/node terminations running on spot instances. Luckily, most of these were monitored, brought to our attention, and automatically addressed by our open source advanced monitoring solutions and resilient cloud management alert-react system, Hollowtrees. However, it was necessary to make fixes in Apache Spark, itself.
In this Apache Spark on Kubernetes series blogpost, we’ll discuss how Spark applications can be made resilient to node failures on Kubernetes. As presented in previous blogposts, the following Spark components are involved when the user submits a Spark application:
- Spark driver
- Spark executor
- External shuffle service
- And Spark resource staging server
To recap, here is a Spark flow that contains the above components. For a comprehensive guide to Spark on Kubernetes, please make sure to read our Apache Spark on Kubernetes series.
We look at each of these components separately.
Spark executor 🔗︎
Spark executors, in combination with an External shuffle service
, are already resilient to failure. The shuffle data generated by Spark executors
is stored in an external service, the External shuffle service
, so these are not lost if the executor
crashes. Also the Spark driver
will re-schedule tasks that have been lost in-flight or unfinished due to the executor
failing.
External shuffle service 🔗︎
This component is deployed as a Kubernetes daemonset. Since this is a daemonset
, Kubernetes ensures that the External shuffle service
runs on each node of the cluster. If a node is lost, when it comes back (or a new replacement node is added to the cluster), Kubernetes will take care of starting an instance of External shuffle service
on it. Of course, when a node dies and cannot be recovered, the shuffle data stored by the External shuffle service
on that node is lost. In such cases the Spark driver
re-schedules the necessary tasks to be re-executed to obtain any missing interim results.
Spark driver 🔗︎
We have opened the following Apache Spark Jira ticket and related GitHub pull request to make the Spark Driver resilient to failures.
The current implementation of the Spark driver
on Kubernetes is not resilient to node failures, since it’s implemented as a Pod
. However, Kubernetes would offer us more resiliency if the Spark driver
was implemented as a Job
.
Let me share the details we collected while working on enabling Spark checkpointing for Spark clusters deployed with our PaaS Pipeline. These led us to the conclusion that the Spark Driver should be a Kubernetes Job
instead of a Pod
.
Create a cluster that consists of three nodes:
kubectl get nodes
NAME STATUS ROLES AGE VERSION
aks-agentpool1-27255451-0 Ready agent 2h v1.8.2
aks-agentpool1-27255451-2 Ready agent 53m v1.8.2
aks-agentpool1-27255451-3 Ready agent 19m v1.8.2
Submit a Spark application using a spark-submit, driver pod, as well as executor pods, running:
kubectl get jobs
No resources found.
kubectl get pods
NAME READY STATUS RESTARTS AGE
networkwordcount-1519305658702-driver 1/1 Running 0 2m
networkwordcount-1519305658702-exec-1 1/1 Running 0 2m
networkwordcount-1519305658702-exec-2 1/1 Running 0 2m
pipeline-traefik-86c8dc89fb-wbtq8 1/1 Running 0 2h
shuffle-6ljkv 1/1 Running 0 1h
shuffle-knkbt 1/1 Running 0 1h
shuffle-tnmk7 1/1 Running 0 34m
zooming-lizzard-spark-rss-54858cdbb9-sksb6 1/1 Running 0 1h
Get the node
the driver is running on:
kubectl describe pod networkwordcount-1519305658702-driver
Name: networkwordcount-1519305658702-driver
Namespace: default
Node: aks-agentpool1-27255451-3/10.240.0.5
Start Time: Thu, 22 Feb 2018 14:21:01 +0100
Labels: spark-app-selector=spark-cd7a98b964a04272b48fc619a501d1cd
spark-role=driver
Annotations: spark-app-name=NetworkWordCount
Status: Running
Shut down node aks-agentpool1-27255451-3
(VM) and observe what happens to the pods that were scheduled to this node:
kubectl get nodes
NAME STATUS ROLES AGE VERSION
aks-agentpool1-27255451-0 Ready agent 2h v1.8.2
aks-agentpool1-27255451-2 Ready agent 1h v1.8.2
All pods that were scheduled to this node are terminated:
kubectl get pods -w
NAME READY STATUS RESTARTS AGE
networkwordcount-1519305658702-driver 1/1 Running 0 5m
networkwordcount-1519305658702-exec-1 1/1 Running 0 5m
networkwordcount-1519305658702-exec-2 1/1 Running 0 5m
pipeline-traefik-86c8dc89fb-wbtq8 1/1 Running 0 2h
shuffle-6ljkv 1/1 Running 0 1h
shuffle-knkbt 1/1 Running 0 1h
shuffle-tnmk7 1/1 Running 0 37m
zooming-lizzard-spark-rss-54858cdbb9-sksb6 1/1 Running 0 1h
networkwordcount-1519305658702-driver 1/1 Terminating 0 5m
networkwordcount-1519305658702-driver 1/1 Terminating 0 5m
networkwordcount-1519305658702-exec-1 1/1 Terminating 0 5m
networkwordcount-1519305658702-exec-2 1/1 Terminating 0 5m
networkwordcount-1519305658702-exec-1 1/1 Terminating 0 5m
shuffle-tnmk7 1/1 Terminating 0 38m
shuffle-tnmk7 1/1 Terminating 0 38m
kubectl get pods
NAME READY STATUS RESTARTS AGE
pipeline-traefik-86c8dc89fb-wbtq8 1/1 Running 0 2h
shuffle-6ljkv 1/1 Running 0 1h
shuffle-knkbt 1/1 Running 0 1h
zooming-lizzard-spark-rss-54858cdbb9-sksb6 1/1 Running 0 1h
We can see that the driver pod networkwordcount-1519305658702-driver
was terminated due to the node hosting it being shut down, and not being rescheduled by Kubernetes to any of the other nodes in the cluster.
So, we modified Spark, creating a version in which the Spark driver is a Kubernetes Job
, instead of a Pod
. Let’s repeat the experiment.
Create a cluster that consists of three nodes:
kubectl get nodes
NAME STATUS ROLES AGE VERSION
aks-agentpool1-27255451-0 Ready agent 1h v1.8.2
aks-agentpool1-27255451-1 Ready agent 1h v1.8.2
aks-agentpool1-27255451-2 Ready agent 1m v1.8.2
Submit a Spark application:
kubectl get jobs
NAME DESIRED SUCCESSFUL AGE
networkwordcount-1519301591265-driver 1 0 3m
kubectl get pods
NAME READY STATUS RESTARTS AGE
networkwordcount-1519301591265-driver-mszl2 1/1 Running 0 3m
networkwordcount-1519301591265-usmj-exec-1 1/1 Running 0 1m
networkwordcount-1519301591265-usmj-exec-2 1/1 Running 0 1m
pipeline-traefik-86c8dc89fb-wbtq8 1/1 Running 0 1h
shuffle-6ljkv 1/1 Running 0 9m
shuffle-crg9q 1/1 Running 0 9m
shuffle-knkbt 1/1 Running 0 2m
zooming-lizzard-spark-rss-54858cdbb9-fx6ll 1/1 Running 0 9m
Notice that the Spark driver networkwordcount-1519301591265-driver
is a Kubernetes Job
, now, which manages the networkwordcount-1519301591265-driver-mszl2
pod.
Get the node
the driver is running on:
kubectl describe pod networkwordcount-1519301591265-driver-mszl2
Name: networkwordcount-1519301591265-driver-mszl2
Namespace: default
Node: aks-agentpool1-27255451-1/10.240.0.5
Start Time: Thu, 22 Feb 2018 13:14:18 +0100
Labels: controller-uid=c2c89fa1-17c9-11e8-868d-0a58ac1f0240
job-name=networkwordcount-1519301591265-driver
spark-app-selector=spark-dac6c2ba946a41d4b1fafe0e077b36ef
spark-role=driver
Annotations: kubernetes.io/created-by={"kind":"SerializedReference","apiVersion":"v1","reference":{"kind":"Job","namespace":"default","name":"networkwordcount-1519301591265-driver","uid":"c2c89fa1-17c9-11e8-868d-0...
spark-app-name=NetworkWordCount
Status: Running
Shut down node aks-agentpool1-27255451-1
(VM) and observe what happens to the pods that were scheduled to this node:
kubectl get nodes
NAME STATUS ROLES AGE VERSION
aks-agentpool1-27255451-0 Ready agent 1h v1.8.2
aks-agentpool1-27255451-2 Ready agent 7m v1.8.2
kubectl get pods
NAME READY STATUS RESTARTS AGE
networkwordcount-1519301591265-driver-dwvkf 1/1 Running 0 3m
networkwordcount-1519301591265-rmes-exec-1 1/1 Running 0 1m
networkwordcount-1519301591265-rmes-exec-2 1/1 Running 0 1m
pipeline-traefik-86c8dc89fb-wbtq8 1/1 Running 0 1h
shuffle-6ljkv 1/1 Running 0 17m
shuffle-knkbt 1/1 Running 0 9m
zooming-lizzard-spark-rss-54858cdbb9-sksb6 1/1 Running 0 1m
kubectl describe pod networkwordcount-1519301591265-driver-dwvkf
Name: networkwordcount-1519301591265-driver-dwvkf
Namespace: default
Node: aks-agentpool1-27255451-2/10.240.0.6
Start Time: Thu, 22 Feb 2018 13:22:56 +0100
Labels: controller-uid=c2c89fa1-17c9-11e8-868d-0a58ac1f0240
job-name=networkwordcount-1519301591265-driver
spark-app-selector=spark-dac6c2ba946a41d4b1fafe0e077b36ef
spark-role=driver
Annotations: kubernetes.io/created-by={"kind":"SerializedReference","apiVersion":"v1","reference":{"kind":"Job","namespace":"default","name":"networkwordcount-1519301591265-driver","uid":"c2c89fa1-17c9-11e8-868d-0...
spark-app-name=NetworkWordCount
Status: Running
Voila! The driver networkwordcount-1519301591265-driver-mszl2
that was originally running on node aks-agentpool1-27255451-1
has been rescheduled to node aks-agentpool1-27255451-2
as networkwordcount-1519301591265-driver-dwvkf
.
Not so fast though, there’s a subtle problem that has to be solved here. When the driver is terminated, its executors (that may run on other nodes) are terminated by Kubernetes with a delay. If you are interested in how this works on Kubernetes, please read: Kubernetes: Garbage Collection
This leads to concurrency issues, in which the re-spawned driver tries to create new executors with the same names as the executors that are still being cleaned up by Kubernetes garbage collection.
We overcame this issue by making the executor names unique for each driver instance. Notice that before the driver and executor pod names looked like:
networkwordcount-1519305658702-driver
<- driver podnetworkwordcount-1519305658702-exec-1
<- executor podnetworkwordcount-1519305658702-exec-2
<- executor pod
By making the driver a Kubernetes Job
, and the executor names unique for each driver, these now look like:
networkwordcount-1519301591265-driver
<- driver jobnetworkwordcount-1519301591265-driver-mszl2
<- driver podnetworkwordcount-1519301591265-usmj-exec-1
<- executor podnetworkwordcount-1519301591265-usmj-exec-1
<- executor pod
And after the node shut down:
networkwordcount-1519301591265-driver
<- driver jobnetworkwordcount-1519301591265-driver-dwvkf
<- re-spawned driver podnetworkwordcount-1519301591265-rmes-exec-1
<- executor podnetworkwordcount-1519301591265-rmes-exec-2
<- executor pod
Spark Resource Staging Server 🔗︎
The Spark resource staging server is only used when those artifacts (jars, files) needed by the Spark application reside on the local machine where the spark-submit
command is issued.
At the moment, this component stores uploaded files in directories that are local to the pod. Thus, if the resource staging server is lost, then restarted, the files stored by the server are lost as well. This means that a Spark driver that was rescheduled to a different node during initialization won’t be able to download the necessary files from the resource staging server.
Thus, we can confirm that the resource staging server is not resilient to failures. We’re working on making that server resilient. See our next blogpost for details on how we achieved that.
As usual, we are contributing all the changes that make Spark resilient to failures on Kubernetes back to the community.