The content of this page hasn't been updated for years and might refer to discontinued products and projects.

Our thinking that there was a hunger for an operator that makes easy the provisioning and operating Apache Kafka clusters on Kubernetes which is not based on Kubernetes stateful sets, proved to be correct as shortly after we released the first version our open-source Koperator a community started to build around it. We received lots of valuable feedback that helps to shape the future of the Koperator and also feature contributions from the community. In the following guest post, Avi Zimmerman talks about Kafka topic, ACLs and user management that he contributed to Koperator .

This is a guest post by Avi Zimmerman.

Kafka is rapidly becoming an integral part of event-driven architectures and data streaming pipelines. But just like in every other service that hold valuable data - it’s necessary to keep what’s important safe from prying eyes and unauthorized access. In Kafka, this means setting up a Public Key Infrastructure (PKI) or configuring SASL, which can be a difficult to automate securely.

With Koperator , the solution to this problem can be managed for you, using CRDs. The Kafka cluster, topics, users, and their ACLs are all handled by the operator based off the yaml definitions you provide. The operator also uses Cruise Control and Prometheus to monitor your cluster’s health and perform graceful healing and scaling operations when needed.

The below picture shows the operator creating a Kafka Topic.

Kafka Topic

Why should I be using SSL and ACLs if I’m only exposing Kafka internally? 🔗︎

A common throughline in the design of data pipelines (that meet security requirements) is that all data should be encrypted both at rest, and in transit. When you configure Kafka to use SSL, data is encrypted between your clients and the Kafka cluster. Additionally, you can restrict access to topics to people holding specific certificates. This gives you an extra layer of security in the event of, for example, an unauthorized user gaining access to a pod inside your cluster. If such a user has the ability to use certain network sniffing tools, they’ll be able to listen to unencrypted traffic inside your cluster. And if Kafka isn’t using ACLs, they can start reading and writing from your topics, and, if they know the address of your cluster, changing broker configurations.

While there’s no such thing as perfect security, it’s easy to take a few reasonable steps to avoid catastrophes like this. And with Koperator doing all the heavy-lifting, why shouldn’t you?

The picture below shows how the operator manages a user created by CRD.

Kafka User

Complete Example 🔗︎

This example will guide you through setting up a Kubernetes cluster on your machine, installing Koperator and its dependencies, deploying a KafkaCluster CR, and then an application with KafkaTopic and KafkaUser CRs.

For this example we are going to use kind to run a cluster on our development machine. Kind lets you run a full kubernetes cluster inside Docker. And you’ll need to have docker installed on your machine, first, if you don’t have it already.

First, we’re going to install kind and create a cluster, then we’ll set our KUBECONFIG to point our kubernetes commands at the local running cluster.

# Install kind on macOS/Linux (refer to their documentation for windows)
curl -Lo ./kind$(uname)-amd64
chmod +x ./kind
sudo mv ./kind /usr/local/bin/kind

# Create a local cluster with one control-pane and three workers
cat << EOF | kind create cluster --config -
kind: Cluster
- role: control-plane
- role: worker
- role: worker
- role: worker

## The above may take a minute or two depending on your internet connection

# Export the kind KUBECONFIG for kubectl, helm, etc.
export KUBECONFIG="$(kind get kubeconfig-path)"

Now that we have a cluster up and running on our machine, we need to add a few more things to it, before we can install Koperator .

First, we’re going to use helm to install cert-manager and the zookeeper-operator. We believe in the principle of seperation of concerns, so Koperator does not install or manage either Zookeeper, or cert-manager. This allows for different, non-dependent services to take care of the things they are best at, without any unnecessary overhead.

# Setup Helm in the Kind cluster
cat << EOF | kubectl apply -f -
apiVersion: v1
kind: ServiceAccount
  name: tiller
  namespace: kube-system
kind: ClusterRoleBinding
  name: tiller
kind: ClusterRole
name: cluster-admin
- kind: ServiceAccount
  name: tiller
  namespace: kube-system

# Install tiller into the cluster and add the jetstack and banzaicloud repos
helm init --service-account tiller
helm repo add jetstack
helm repo add banzaicloud-stable
helm repo update

# Install the cert-manager and CRDs into the cluster
kubectl create ns cert-manager
kubectl label namespace cert-manager
kubectl apply -f
helm install --name cert-manager --namespace cert-manager --version v0.10.1 --set webhook.enabled=false jetstack/cert-manager

# Install the zookeeper-operator and setup a zookeeper cluster
helm install --name zookeeper-operator --namespace zookeeper banzaicloud-stable/zookeeper-operator
cat << EOF | kubectl apply -f -
kind: ZookeeperCluster
  name: example-zookeepercluster
  namespace: zookeeper
  replicas: 3

We are now ready to setup a Kafka cluster running on Kubernetes. All the examples below can be found in the Koperator repository so we’ll first clone that with git.

# Clone the repository
git clone
cd koperator

# Install the operator - we will disable prometheus server for this example
helm install --name kafka-operator --namespace kafka banzaicloud-stable/kafka-operator

# Create the example Kafka cluster with internal SSL and ACLs enabled
kubectl apply -n kafka -f config/samples/kafkacluster_with_ssl_groups.yaml

The last command above will return immediately, but it may take 2-3 minutes for all brokers and cruise-control to startup. You can use kubectl to see when the brokers are all ready. The output should look something like this:

$> kubectl get pod -n kafka

NAME                                       READY   STATUS    RESTARTS   AGE
kafka-cruisecontrol-549976ffd8-6swvz       1/1     Running   0          1m48s
kafka-operator-operator-5cf97c9959-llpnn   2/2     Running   0          3m22s
kafkab6dx6                                 1/1     Running   0          2m6s
kafkafhdsv                                 1/1     Running   0          2m7s
kafkapnd5w                                 1/1     Running   0          2m21s

The sample CR configured a three-broker Kafka cluster with SSL, a managed PKI for user certificates, and a cluster-wide internal reachable address of kafka-headless.kafka.svc.cluster.local:29092.

We can now deploy kafka topics, users, and a pod reading/writing from the topic with a simple manifest:

# kafka-operator/hack/kafka-test-pod/manifest.yaml
# A KafkaTopic called 'test-topic' with a single partition replicated across all three brokers
#  -- you can also specify arbitrary topic configurations with `spec.config` --
kind: KafkaTopic
  name: test-topic
    name: kafka
    namespace: kafka
  name: test-topic
  partitions: 1
  replicationFactor: 3
# A KafkaUser with permission to read from 'test-topic'
kind: KafkaUser
  name: test-kafka-consumer
    name: kafka
    namespace: kafka
  secretName: test-kafka-consumer
    - topicName: test-topic
      accessType: read
# A KafkaUser with permission to write to 'test-topic'
kind: KafkaUser
  name: test-kafka-producer
    name: kafka
    namespace: kafka
  secretName: test-kafka-producer
    - topicName: test-topic
      accessType: write
# A pod containing a producer and consumer using the above credentials
apiVersion: v1
kind: Pod
  name: kafka-test-pod


    # The consumer secret
    - name: consumer-credentials
        defaultMode: 420
        secretName: test-kafka-consumer

    # The producer secret
    - name: producer-credentials
        defaultMode: 420
        secretName: test-kafka-producer


    # Container reading from topic with the consumer credentials
    # The environment variables used below are for our test-application,
    # which is described in greater detail later in this post.
    - name: consumer
      image: banzaicloud/kafka-test:latest
        - name: KAFKA_MODE
          value: consumer
        - mountPath: /etc/secrets/certs
          name: consumer-credentials
          readOnly: true

    # Container writing to topic with the producer credentials
    - name: producer
      image: banzaicloud/kafka-test:latest
        - name: KAFKA_MODE
          value: producer
        - mountPath: /etc/secrets/certs
          name: producer-credentials
          readOnly: true

You can apply the manifest above (found in the repository) to your test cluster and it should start reading and writing from Kafka. We use kubetail to tail the logs below, which is a script for tailing the logs of multiple pods/containers at a time.

$> kubectl apply -f hack/kafka-test-pod/manifest.yaml created created created
pod/kafka-test-pod created

# The above pod may restart a couple times at first due to applying
# all the objects at once like this and not everything being ready.
# But after a few seconds you should be able to tail the logs and see something like this.
$> kubetail kafka-test-pod
Will tail 2 logs...
kafka-test-pod consumer
kafka-test-pod producer

[kafka-test-pod producer] 2019/09/19 02:07:18 Sending message to topic
[kafka-test-pod consumer] 2019/09/19 02:07:18 Consumed message offset 0 - hello world
[kafka-test-pod producer] 2019/09/19 02:07:23 Sending message to topic
[kafka-test-pod consumer] 2019/09/19 02:07:23 Consumed message offset 1 - hello world
[kafka-test-pod producer] 2019/09/19 02:07:28 Sending message to topic
[kafka-test-pod consumer] 2019/09/19 02:07:28 Consumed message offset 2 - hello world
[kafka-test-pod producer] 2019/09/19 02:07:33 Sending message to topic
[kafka-test-pod consumer] 2019/09/19 02:07:33 Consumed message offset 3 - hello world

Show me some code! 🔗︎

The code running in the pod above is a simple Go application that depends on the environment variable KAFKA_MODE, which will either read or be written from our test topic. The code for this example can be found in the kafka-operator repository at hack/kafka-test-pod/main.go and at the end of this blog post.

First, if you look at the manifest above, you’ll see that we set a secret to store our user credentials. We are mounting this secret inside the pod at /etc/secrets/certs. We first define a function that can create a TLS configuration from those credentials.

package main

import (

// The paths to our credentials
const certFile = "/etc/secrets/certs/tls.crt"
const keyFile = "/etc/secrets/certs/tls.key"
const caFile = "/etc/secrets/certs/ca.crt"

// ...

func getTLSConfig() *tls.Config {
	// Create a TLS configuration from our secret for connecting to Kafka
	clientCert, err := tls.LoadX509KeyPair(certFile, keyFile)
	if err != nil {
	caCert, err := ioutil.ReadFile(caFile)
	if err != nil {
	caPool := x509.NewCertPool()
	return &tls.Config{
		Certificates: []tls.Certificate{clientCert},
		RootCAs:      caPool,

In this example, we’re using Shopify’s sarama library to interact with Kafka. We can include the TLS configuration created by the function above in our options to the sarama client. For example, to create a new consumer we can do the following:

package main

import (
  // ...


var brokerAddrs = []string{"kafka-headless.kafka.svc.cluster.local:29092"}

func main() {
  config := sarama.NewConfig()
  config.Net.TLS.Enable = true
  config.Net.TLS.Config = getTLSConfig()

  // Get a consumer
  consumer, err := sarama.NewConsumer(brokerAddrs, config)
  if err != nil {
  defer consumer.Close()

The full application will create a sarama configuration using the methods above. Then, depending on the value of the environment variable, will produce or consume from the topic. Below is the full example with inline comments.

package main

import (


// Constants for our topic and credential locations
const kafkaTopic = "test-topic"
const certFile = "/etc/secrets/certs/tls.crt"
const keyFile = "/etc/secrets/certs/tls.key"
const caFile = "/etc/secrets/certs/ca.crt"

// The address we are going to use for kafka broker discovery
var brokerAddrs = []string{"kafka-headless.kafka.svc.cluster.local:29092"}

func main() {

	// Create an SSL configuration for connecting to Kafka
	config := sarama.NewConfig()
	config.Net.TLS.Enable = true
	config.Net.TLS.Config = getTLSConfig()

	// Trap SIGINT and SIGTERM to trigger a shutdown.
	signals := make(chan os.Signal, 1)
	signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM)

	// Consume or produce depending on the value of the environment variable
	if os.Getenv("KAFKA_MODE") == "consumer" {
		consume(config, signals)
	} else if os.Getenv("KAFKA_MODE") == "producer" {
		produce(config, signals)
	} else {
		log.Fatal("Invalid test mode:", os.Getenv("KAFKA_MODE"))


func getTLSConfig() *tls.Config {
	// Create a TLS configuration from our secret for connecting to Kafka
	clientCert, err := tls.LoadX509KeyPair(certFile, keyFile)
	if err != nil {
	caCert, err := ioutil.ReadFile(caFile)
	if err != nil {
	caPool := x509.NewCertPool()
	return &tls.Config{
		Certificates: []tls.Certificate{clientCert},
		RootCAs:      caPool,

func consume(config *sarama.Config, signals chan os.Signal) {
	// Get a consumer
	consumer, err := sarama.NewConsumer(brokerAddrs, config)
	if err != nil {
	defer consumer.Close()

	// Setup a channel for messages from our topic
	partitionConsumer, err := consumer.ConsumePartition(kafkaTopic, 0, sarama.OffsetNewest)
	if err != nil {
	defer partitionConsumer.Close()

	consumed := 0
	// Consume the topic
	for {
		select {
		case msg := <-partitionConsumer.Messages():
			log.Printf("Consumed message offset %d - %s\n", msg.Offset, string(msg.Value))
		case <-signals:
			log.Println("Shutting down")
			break ConsumerLoop

func produce(config *sarama.Config, signals chan os.Signal) {

	// Get a broker connection
	broker := sarama.NewBroker(brokerAddrs[0])
	if err := broker.Open(config); err != nil {
	defer broker.Close()

	// Create a time ticker to trigger a message every 5 seconds
	ticker := time.NewTicker(time.Duration(5) * time.Second)

	// Send a message everytime the ticker is triggered
	for {
		select {
		case <-ticker.C:
			log.Println("Sending message to topic")
			msg := &sarama.ProduceRequest{}
			msg.AddMessage(kafkaTopic, 0, &sarama.Message{
				Value: []byte("hello world"),
			if _, err := broker.Produce(msg); err != nil {
		case <-signals:
			log.Println("Shutting down")
			break ProducerLoop


And, voila, you’ve set up a Kafka cluster with access control and user authentication. Hopefully, this exercise had been sufficiently enlightening, but if you have questions, or if you’re interested in contributing, check us out on GitHub.

