Banzai Cloud is now part of Cisco

Banzai Cloud Logo Close
Home Products Benefits Blog Company Contact

Apache Spark (Driver) resilience on Kubernetes - network partitioning

Author Balint Molnar

The content of this page hasn't been updated for years and might refer to discontinued products and projects.

At Banzai Cloud we are building a feature-rich enterprise-grade application platform, built for containers on top of Kubernetes, called Pipeline. Applications deployed to Pipeline automatically inherit the platform’s features: enterprise-grade security, observability (centralized log collection, monitoring and tracing), discovery, high availability and resiliency, just to name a few - encapsulated in spotguides.

One of the most popular spotguides we deploy is Spark. In the past few months we’ve been working and pushing many pull requests to make Spark a first class player on Kubernetes and to make it resilient. There is a good collection of posts (and contributions) we’ve already put out, so, if you are interested in one of these features, read:

Apache Spark on Kubernetes series:
Introduction to Spark on Kubernetes
Scaling Spark made simple on Kubernetes
The anatomy of Spark applications on Kubernetes
Monitoring Apache Spark with Prometheus
Spark History Server on Kubernetes
Spark scheduling on Kubernetes demystified
Spark Streaming Checkpointing on Kubernetes
Deep dive into monitoring Spark and Zeppelin with Prometheus
Apache Spark application resilience on Kubernetes
Collecting Spark History Server event logs in the cloud

Spark resilience on Kubernetes 🔗︎

Update - The solution described below using ReadWriteOnce storage for checkpointing dir works. However, there may be use cases that require ReadWriteMany/Object storage to be used for checkpointing directory. In such instances the solution described is imperfect. One possible solution is to use Spark Operator. Another possible solution would be the utilization of StatefulJob - a new Kubernetes feature.

Today’s post will focus on Spark Driver resiliency on Kubernetes, illustrated through the simulation of a node failure. Note that, at this point (upstream), not all Spark components are resilient - we’ve highlighted these in this post: Apache Spark application resilience on Kubernetes, however, we have now contributed all changes as pull requests - in the meantime you can use our Spark fork as you see fit.

Earlier we proposed a change PR to Apache Spark, which enables driver resiliency by using a Kubernetes Job instead of a Pod. In this blog post we will not go into details about implementation, as we have already published two blog posts on that subject. Instead, we will focus on the question, is it a good idea to use Kubernetes Jobs to make Spark resilient to failures?.

Lets start by keeping in mind that Spark batch processing applications are stateless. If the driver pod dies all previous computation is lost, and the newly created driver will pick up computation over again from the beginning.

This is not a Spark on Kubernetes limitation as Spark on Yarn also behaves the same way.

Spark Stream processing applications are different from batch processing, as they are statefull when checkpointing is enabled. With checkpointing enabled, Spark persists data so that in case of failure it can recover it.

Correctness is vital here, since we cannot afford checkpoint file corruptions. Corruption may occur if two Spark driver pods run simultaneously while working on the same Spark Job.

We can see that correctness is not an issue when the Spark submit creates a Pod because, in case of a Node failure, the Pod will simply die and the Kubernetes Controller will not reschedule a new Pod. If there is a Job, the controller will reschedule a new Pod, as long as the last one died without completing said job. The question is, is it possible that two Spark driver pods executing the same Spark job might be scheduled simultaneously. Well, yes, because the Job controller is greedy and, in the event of network failure, will schedule a new driver. However, based on our observations, such instances do not affect correctness at all.

Simulate the scenario 🔗︎

We will simulate the following scenario:

Spark Resiliency

We are going to use a GKE cluster, which can be created easily, either by using Pipeline or by using the Google Cloud Console UI.

We are going to use iptables, so please use Ubuntu as the image type. Make sure you create a cluster with at least 2 nodes.

Next, add your ssh-key, as we will need to get inside the node to simulate a network failure.

Generate the kubernetes config:

gcloud container clusters get-credentials <your cluster name> --zone <your zone>

To simulate a network partition we are going to ssh into the node and add some iptables rules, so that the kubelet cannot access the Kubernetes API server. Before that, we need to submit a Spark Job which runs long enough to encounter the network partition. For the sake of simplicity we are going to use Spark Pi with a large precision and only two executors. To check what happens with the checkpoint dir we are going to attach a PVC to the Pod. (We know it is not a Spark Streaming App, but, as we will see, that does not really matter.)

kubectl create -f - <<EOF
kind: PersistentVolumeClaim
apiVersion: v1
metadata:
  name: spark-pvc-claim
spec:
  accessModes:
    - "ReadWriteOnce"
  resources:
    requests:
       storage: "1Gi"
EOF

Submit the Spark Pi application:

bin/spark-submit --verbose \
    --master k8s://https://35.197.251.160 \
    --deploy-mode cluster \
    --name spark-pi \
    --conf spark.kubernetes.driver.volumes.persistentVolumeClaim.checkpointpvc.mount.path=/checkpoint \
    --conf spark.kubernetes.driver.volumes.persistentVolumeClaim.checkpointpvc.mount.readOnly=false \
    --conf spark.kubernetes.driver.volumes.persistentVolumeClaim.checkpointpvc.options.claimName=spark-pvc-claim \
    --class org.apache.spark.examples.SparkPi \
    --conf spark.executor.instances=2 \
    --conf spark.kubernetes.driver.job.backofflimit=1 \
    --conf spark.kubernetes.container.image=banzaicloud/spark:master_job-dev_0.5_blog \
    local:///opt/spark/examples/target/scala-2.11/jars/spark-examples_2.11-2.4.0-SNAPSHOT.jar 200000

Check where the driver pod is scheduled and look up the external IP address of the node in the Google Cloud Console UI.

kubectl describe pod spark-pi-1531855837017-driver-sx77q

Name:           spark-pi-1531855837017-driver-sx77q
Namespace:      default
Node:           gke-blog-cluster-1-default-pool-7484150c-z6wz/10.154.0.3

SSH into that Node and apply the following iptables rules:

# First, save the iptables rules so they can be reused later
sudo iptables-save >  iptables.backup

# Check which port is used by kubelet
sudo netstat -ntp  | grep kubelet
tcp   0      0 10.154.0.3:55288        35.197.251.160:443      ESTABLISHED 2191/kubelet
tcp   0      0 10.154.0.3:55282        35.197.251.160:443      ESTABLISHED 2191/kubelet
tcp   0      0 10.154.0.3:35414        169.254.169.254:80      ESTABLISHED 2191/kubelet
tcp6  0      0 10.154.0.3:10255        10.52.0.6:34778         ESTABLISHED 2191/kubelet
tcp6  0      0 10.154.0.3:10255        10.52.1.4:57318         ESTABLISHED 2191/kubelet
tcp6  0      0 10.154.0.3:10250        10.154.0.2:53980        ESTABLISHED 2191/kubelet

# Our Kubernetes API external IP is 35.197.251.160 and
# the internal IP is 169.254.169.254. We need to block those
# to simulate network partition
sudo iptables -A OUTPUT -p tcp -d 35.197.251.160 -j DROP
sudo iptables -A OUTPUT -p tcp -d 169.254.169.254 -j DROP

Check the Kubernetes nodes. After a while, because of the iptables rules, one node state will change to NotReady.

kubectl get nodes -w
NAME                                            STATUS    ROLES     AGE       VERSION
gke-blog-cluster-1-default-pool-7484150c-qhwh   Ready     <none>    4h        v1.10.4-gke.2
gke-blog-cluster-1-default-pool-7484150c-z6wz   Ready     <none>    4h        v1.10.4-gke.2
gke-blog-cluster-1-default-pool-7484150c-qhwh   Ready     <none>    4h        v1.10.4-gke.2
gke-blog-cluster-1-default-pool-7484150c-z6wz   NotReady   <none>    4h        v1.10.4-gke.2
gke-blog-cluster-1-default-pool-7484150c-qhwh   Ready     <none>    4h        v1.10.4-gke.2

If we also check the Kubernetes pods, we will see the following:

kubectl get pods -w
NAME                                     READY     STATUS    RESTARTS   AGE
spark-pi-1531855837017-driver-sx77q      1/1       Running   0          1m
spark-application-1531855852300-exec-1   1/1       Running   0          1m
spark-application-1531855852300-exec-2   1/1       Running   0          1m
spark-pi-1531855837017-driver-sx77q      1/1       Unknown   0          7m

After a small amount of time the driver pod will go from Running to Unknown. Remember, that pod is running on a Node which is separated from the cluster. As you know, the Spark job has been submitted as a Kubernetes Job, so the Job controller will create a new Pod as soon as the other goes into an Unknown state. This can cause correctness issues - because the separated driver pod is still running, it does not require a connection to the Kubernetes API, and if two driver pods are running side by side, that’s a problem. Let’s check back on the pods.

kubectl get pods -w
NAME                                  READY     STATUS              RESTARTS   AGE
spark-pi-1531855837017-driver-p6cbm   0/1       ContainerCreating   0          7s
spark-pi-1531855837017-driver-sx77q   1/1       Unknown             0          7m

If we investigate the newly created driver pod we can see that it will never start successfully as long as the other pod is not terminated, thanks to the ReadWriteOnce PVC, so there will be no correctness issues at all.

Warning FailedMount 19s kubelet, gke-blog-cluster-1-default-pool-7484150c-qhwh Unable to mount volumes for pod “spark-pi-1531902241419-driver-p6cbm_default(c00c5519-8a64-11e8-b81d-42010a9a00fa)": timeout expired waiting for volumes to attach or mount for pod “default”/“spark-pi-1531902241419-driver-p6cbm”. list of unmounted volumes=[checkpointpvc]. list of unattached volumes=[spark-local-dir-1 checkpointpvc spark-conf-volume default-token-nbpnb]

Now let’s restore the iptables rules to normal, so the node can rejoin the cluster.

sudo iptables-restore < iptables.backup

The lost node will rejoin the cluster

kubectl get nodes -w
NAME                                            STATUS    ROLES     AGE       VERSION
gke-blog-cluster-1-default-pool-7484150c-qhwh   Ready     <none>    4h        v1.10.4-gke.2
gke-blog-cluster-1-default-pool-7484150c-z6wz   Ready     <none>    4h        v1.10.4-gke.2
gke-blog-cluster-1-default-pool-7484150c-qhwh   Ready     <none>    4h        v1.10.4-gke.2
gke-blog-cluster-1-default-pool-7484150c-z6wz   Ready   <none>    4h        v1.10.4-gke.2

And pod status will show that, once the old driver has been terminated (it will be, since Kubernetes terminates every pod on the rejoined node), the new one will arise and finally be properly created.

kubectl get pods -w
spark-pi-1531855837017-driver-p6cbm   1/1       Running       0          2m
spark-pi-1531855837017-driver-sx77q   0/1       Terminating   0          9m
spark-application-1531856426866-exec-1   0/1       Pending   0         0s
spark-application-1531856426866-exec-1   0/1       Pending   0         0s
spark-application-1531856426866-exec-1   0/1       ContainerCreating   0         0s
spark-application-1531856426866-exec-2   0/1       Pending   0         0s
spark-application-1531856426866-exec-2   0/1       Pending   0         0s
spark-application-1531856426866-exec-2   0/1       ContainerCreating   0         0s
spark-application-1531856426866-exec-2   1/1       Running   0         1s
spark-application-1531856426866-exec-1   1/1       Running   0         1s

Conclusion 🔗︎

To summarize, let’s take a look at the image below. It shows what happens during a network separation.

Spark Resiliency