At Banzai Cloud we are building a cloud agnostic, open source next generation CloudFoundry/Heroku-like PaaS, Pipeline, while running several big data workloads natively on Kubernetes. Apache Kafka is one of the cloud native
workloads we support out-of-the-box, alongside Apache Spark and Apache Zeppelin.
If you’re interested in running big data workloads on Kubernetes, please read the following blog series as well.
Apache Kafka on Kubernetes series:
Kafka on Kubernetes - using etcd Monitoring Apache Kafka with Prometheus Kafka on Kubernetes with Local Persistent Volumes Kafka on Kubernetes the easy way
Apache Spark on Kubernetes series:
Introduction to Spark on Kubernetes
Scaling Spark made simple on Kubernetes
The anatomy of Spark applications on Kubernetes
Monitoring Apache Spark with Prometheus
Spark History Server on Kubernetes
Spark scheduling on Kubernetes demystified
Spark Streaming Checkpointing on Kubernetes
Deep dive into monitoring Spark and Zeppelin with Prometheus
Apache Spark application resilience on Kubernetes
Collecting Spark History Server event logs in the cloud
Apache Zeppelin on Kubernetes series:
Running Zeppelin Spark notebooks on Kubernetes
Running Zeppelin Spark notebooks on Kubernetes - deep dive
Introduction π︎
Update - we have opened the following KAFKA-6598 ticket to help get the community involved.
Apache Kafka, which was originally developed at LinkedIn, started as a distributed commit log. Since then, it has evolved into a distributed streaming platform.
It was open sourced back in 2011 and became popular extremely fast. It’s a simple and easy to use tool that keeps evolving, and has a vibrant community. One of the biggest headaches we hear Kafka users complaining about is the Zookeeper dependency, and the need to maintain a Zookeeper cluster. So what is Zookeeper?
According to its homepage, Zookeeper is “a centralized service for maintaining configuration information, naming, providing distributed synchronization, and providing group services.” It’s useful when creating distributed applications, and that’s why Kafka relies on it. Zookeeper is based on the Zookeeper Atomic Broadcast protocol (ZAB), a consensus protocol which shares some key aspects with Paxos. For more details, take a look at this page.
So what’s all the fuss? Well, we’re not saying that ZK is bad
, but…
- Unlike Kafka, it does not have a large and vibrant community (merge those PR’s please, anyone?)
- It uses a protocol which is hard to understand, and is hard to maintain in a large Zookeeper cluster
- It’s a bit outdated, compared with, say, Raft
- It’s written in Java (yes, this is a
subjective
problem, but this is a problem for us, because ZK is an infrastructure component) - We run everything in Kubernetes, and k8s has built-in Raft implementation by default, etcd
- Linearizability (if you’ll forgive us minting a word) - check this comparison chart
- Performance and inherent scalability issues
- Client-side complexity and
thick clients
- Lack of service discovery
The list above might grow, but I think we’ve made our point. So we have an easy to use and maintain Kafka cluster and a Zookeeper cluster dependency, which is difficult to understand and hard to maintain. You may ask why the community hasn’t already gotten rid of this Zookeeper dependency, or at least made Kafka pluggable, so the end user can choose for themselves. Kafka was designed and built around Zookeeper so it’s really hard to just throw it away, but recently the community has put together a huge refactor around the Kafka code base that handles Zookeeper interactions, so factoring out Zookeeper has become easier. Thanks a lot!.
We’re happy to announce that we at Banzai Cloud have removed the Zookeeper dependency, and use etcd instead. Today, we’re open sourcing the code so we can all try, adjust, make or propose fixes, listen to the Apache Kafka community and eventually submit it as a PR upstream. The code is available on the Banzai Cloud GitHub repository.
Kafka on etcd π︎
etcd is a “distributed reliable key-value store for the most critical data of a distributed system”. It uses the Raft consensus algorithm which was designed to be easy to understand, to scale, and to operate. The protocol and the etcd
implementation were very quickly adopted by large distributed systems like Kubernetes, large distributed databases or messaging frameworks, where consensus and strong consistency is a must. It has a vibrant community and is easy to use: on GitHub, alone, over 500+ projects use it. It’s written in Golang so is not directly embeddable in Kafka, however, CoreOS released a project called jetcd, which allows us to interact with etcd from Scala\Java
.
During the refactoring
of Apache Kafka on etcd we faced several challenges. Despite etcd claims
that it can replace Zookeeper completely, there are several approaches that differ significantly in Zookeeper. Please find below the biggest incongruities we faced during the integration.
etcd and Zookeeper differences π︎
- Zookeeper uses a so-called
Znode
, which can store information but can also havechildren
nodes. Moreover, users can registerwatchers
to aparent
node, and Zookeeper will, for example, report on children nodes whenever they’re created. Even though every registry in etcd is just akey with value
, users cannot register new keys as children keys, so, by default, it’s not possible to register a watcher on aparent key
that will inform us when new children keys are created. Kafka relies heavily on Zookeeper, so we had to figure out ways of imitating its behavior with etcd. Forchild
nodes, we simply concatenate their paths, so if Kafka wants to inject, for example, “id” under the “broker” node, we create a key in etcd as “/broker/id”. In order to solve the other problem mentioned above, we first check if the created node has aparent
registry. If it has one and there is a registeredwatcher
in it, then we register a new watcher to the newly created key/value. - Zookeeper can have nodes that contain no information, but etcd cannot create a key without value, so, until we figure out how to handle this in a nicer way, we’re putting in “[null]” strings.
- Zookeeper has a so-called
ephemeral node
, which is used by Kafka to makeliveliness
checks with brokers. This node type requires frequent heartbeats from Kafka, otherwise Zookeeper deletes the node. etcd does not have ephemeral nodes but it hasleases
. If alease
goes unrenewed beyond a configurable time, then etcd deletes the key/value. - Recently, Kafka has started to use persistent
sequential
Zookeeper nodes. In interactions, these nodes will increase acounter
in their names. In etcd we use the key/value-bounded counter, which also changes if an interaction occurs. - We use etcd
transactions
for put, get and exists operations but, unfortunately, jetcd contains a bug that affects alltransaction
operations. We blogged about this problem and its resolution quite some time ago. - We also put significant effort into a
metastore
. We’re introducing a newKafkaMetaStore
trait, which allows the users to implement their ownmetastore
for Kafka. But, keep in mind that, for now, Kafka depends so much on Zookeeper that it requires tremendous effort to remove all things Zookeeper-related. Our current solution is to map etcd code back to Zookeeper’s. We hope that the community will become engaged and help us to refactor this part of Kafka as well.
Try it out π︎
Create a Kubernetes cluster π︎
To try out Kafka
we created a Kubernetes cluster on Microsoft Azure Managed Kubernetes, AKS with Pipeline. Just to recap, Pipeline can provision Kubernetes clusters across all major cloud providers and automate Helm deployments through a RESTful API. It also has a CI/CD component, in which cluster creates, artifact builds and deployments can be wired into a workflow.
Note: The Pipeline CI/CD module mentioned in this post is outdated and not available anymore. You can integrate Pipeline to your CI/CD solution using the Pipeline API. Contact us for details.
If youβd like to use Pipeline to create Kubernetes clusters, please follow the following how-to. All the RESTful API calls are available through the following postman collection (e.g create a Kubernetes cluster and get the Kubernetes config).
Deploy the Kafka Helm charts to a Kubernetes cluster π︎
The example below should work on any Kubernetes cluster, and itβs not tied to Pipeline. You can take the Helm chart from the Banzai Cloud charts repo.
Using the above mentioned postman collection, you can deploy the Kafka helm chart, by using Deployment Create
with a modified body, which should look like this:
{"name": "banzaicloud-stable/kafka"}
You can check Kafka cluster creation by using kubectl get pods
(remember, to properly set your kubecontext
)
>kubectl get pods
NAME READY STATUS RESTARTS AGE
etcd-cluster-0000 1/1 Running 0 3m
etcd-cluster-0001 1/1 Running 0 3m
etcd-cluster-0002 1/1 Running 0 3m
kafka-0 1/1 Running 0 4m
kafka-1 1/1 Running 0 2m
kafka-2 1/1 Running 0 1m
nosy-alpaca-etcd-operator-57f46478fd-dt5q8 1/1 Running 0 4m
Produce and Consume messages π︎
At this point your Kafka cluster is only accessible inside the Kubernetes cluster, so you have to create a kafka-test
pod with the following yaml:
Pipeline spotguides automate this process.
apiVersion: v1
kind: Pod
metadata:
name: kafka-test
spec:
containers:
- name: kafka-test
image: banzaicloud/kafka:2.12-1.2.0-etcd-0.0.1
# Just spin & wait forever
command: [ "/bin/bash", "-c", "--" ]
args: [ "while true; do sleep 3000; done;" ]
This creates a simple pod which will be available when trying out Kafka (kubectl create -f kafka-test.yaml
). The next Pipeline release will contain the Kafka spotguide
as well, thus Kafka will become accessible from outside.
Now, exec
into this pod by using: kubectl exec -it kafka-test bash
. Once you are inside the container, create a topic:
./bin/kafka-topics.sh --zookeeper etcd://etcd-cluster-client:2379 --create --topic kafka-test --partitions 1 --replication-factor 3
Created topic "kafka-test".
Once we’re done, we’ll produce some messages:
root@kafka-test:/opt/kafka# ./bin/kafka-console-producer.sh --broker-list bootstrap:9092 --topic kafka-test
>welcome
>kafka
>on
>etcd
>good
>you
>are
>here
Let’s consume these messages:
./bin/kafka-console-consumer.sh --bootstrap-server bootstrap:9092 --topic kafka-test --from-beginning
welcome
kafka
on
etcd
good
you
are
here
As you see all the messages arrived from the producer side.
Handling broker failures π︎
Now we’re going to simulate a broker failure in the cluster. From etcd
we can see that the broker with id 0 is the partition’s leader, and all other brokers are in sync:
/brokers/topics/kafka-test/partitions/0/state
{"controller_epoch":1,"leader":0,"version":1,"leader_epoch":0,"isr":[0,1,2]}
Kill broker with id 0, and check if we can still consume all the messages.
kubectl delete pod kafka-0
kubectl get pods
NAME READY STATUS RESTARTS AGE
etcd-cluster-0000 1/1 Running 0 24m
etcd-cluster-0001 1/1 Running 0 24m
etcd-cluster-0002 1/1 Running 0 24m
kafka-0 1/1 Terminating 0 24m
kafka-1 1/1 Running 0 23m
kafka-2 1/1 Running 0 22m
kafka-test 1/1 Running 0 11m
nosy-alpaca-etcd-operator-57f46478fd-dt5q8 1/1 Running 0 24m
/brokers/topics/kafka-test/partitions/0/state
{"controller_epoch":2,"leader":1,"version":1,"leader_epoch":1,"isr":[1,2]}
./bin/kafka-console-consumer.sh --bootstrap-server bootstrap:9092 --topic kafka-test --from-beginning
welcome
kafka
on
etcd
good
you
are
here
/brokers/topics/kafka-test/partitions/0/state
{"controller_epoch":2,"leader":1,"version":1,"leader_epoch":1,"isr":[1,2,0]}
As you can see, leader election was successful, and all messages are consumable.
What’s next π︎
After a code refactor, we’d like to contribute this code back to the Apache Kafka community and begin a conversation about improvements, future plans and changes. We’d like to give Kafka users the chance to choose whether they’d like to use Zookeeper or etcd. These are especially important for users who deploy Kafka to Kubernetes, like us.
There’s a list of unsupported features - about which we’d love to receive feedback. At Banzai Cloud all our workloads are cloud based (mostly managed by Kubernetes), and we rely heavily on cloud providers’ security features. Pipeline, k8s clusters internally, and interactions with third parties all use OAuth tokens
that are stored/leased by Vault (for our internal security architecture read this post). This model is a little bit different from how Kafka currently deals with security, thus the unsupported features are:
- ACL support
- Kerberos (etcd does not support Kerberos)
We are running proof of concepts internally, however, we believe in the power of community and we know that these are considerably more difficult than the changes we’ve already made. We invite anybody interested to join the project; let’s make Kafka a first class citizen on Kubernetes.
So far, all our pilot users considered Kerberos overkill, and already use OAuth2 or OpenID, though they are all Web 2.0 (or 3.0?) companies, with their deployment primarily in the cloud