Our thinking that there was a hunger for an operator
that makes easy the provisioning and operating Apache Kafka clusters on Kubernetes which is not based on Kubernetes stateful sets, proved to be correct as shortly after we released the first version our open-source
Koperator
a community started to build around it.
We received lots of valuable feedback that helps to shape the future of the
Koperator
and also feature contributions from the community.
In the following guest post, Avi Zimmerman talks about Kafka topic, ACLs and user management
that he contributed to
Koperator
.
This is a guest post by Avi Zimmerman.
Kafka is rapidly becoming an integral part of event-driven architectures and data streaming pipelines. But just like in every other service that hold valuable data - it’s necessary to keep what’s important safe from prying eyes and unauthorized access. In Kafka, this means setting up a Public Key Infrastructure (PKI) or configuring SASL, which can be a difficult to automate securely.
With
Koperator
, the solution to this problem can be managed for you, using CRDs. The Kafka cluster, topics, users, and their ACLs are all handled by the operator based off the yaml
definitions you provide. The operator also uses Cruise Control and Prometheus to monitor your cluster’s health and perform graceful healing and scaling operations when needed.
The below picture shows the operator creating a Kafka Topic.
Why should I be using SSL and ACLs if I’m only exposing Kafka internally? 🔗︎
A common throughline in the design of data pipelines (that meet security requirements) is that all data should be encrypted both at rest, and in transit. When you configure Kafka to use SSL, data is encrypted between your clients and the Kafka cluster. Additionally, you can restrict access to topics to people holding specific certificates. This gives you an extra layer of security in the event of, for example, an unauthorized user gaining access to a pod inside your cluster. If such a user has the ability to use certain network sniffing tools, they’ll be able to listen to unencrypted traffic inside your cluster. And if Kafka isn’t using ACLs, they can start reading and writing from your topics, and, if they know the address of your cluster, changing broker configurations.
While there’s no such thing as perfect security, it’s easy to take a few reasonable steps to avoid catastrophes like this. And with Koperator doing all the heavy-lifting, why shouldn’t you?
The picture below shows how the operator manages a user created by CRD.
Complete Example 🔗︎
This example will guide you through setting up a Kubernetes cluster on your machine, installing
Koperator
and its dependencies, deploying a KafkaCluster
CR, and then an application with KafkaTopic
and KafkaUser
CRs.
For this example we are going to use kind to run a cluster on our development machine. Kind lets you run a full kubernetes cluster inside Docker. And you’ll need to have docker installed on your machine, first, if you don’t have it already.
First, we’re going to install kind and create a cluster, then we’ll set our KUBECONFIG
to point our kubernetes commands at the local running cluster.
# Install kind on macOS/Linux (refer to their documentation for windows)
curl -Lo ./kind https://github.com/kubernetes-sigs/kind/releases/download/v0.5.1/kind-$(uname)-amd64
chmod +x ./kind
sudo mv ./kind /usr/local/bin/kind
# Create a local cluster with one control-pane and three workers
cat << EOF | kind create cluster --config -
kind: Cluster
apiVersion: kind.sigs.k8s.io/v1alpha3
nodes:
- role: control-plane
- role: worker
- role: worker
- role: worker
EOF
## The above may take a minute or two depending on your internet connection
# Export the kind KUBECONFIG for kubectl, helm, etc.
export KUBECONFIG="$(kind get kubeconfig-path)"
Now that we have a cluster up and running on our machine, we need to add a few more things to it, before we can install Koperator .
First, we’re going to use helm to install cert-manager
and the zookeeper-operator
.
We believe in the principle of seperation of concerns
, so
Koperator
does not install or manage either Zookeeper, or cert-manager.
This allows for different, non-dependent services to take care of the things they are best at, without any unnecessary overhead.
# Setup Helm in the Kind cluster
cat << EOF | kubectl apply -f -
apiVersion: v1
kind: ServiceAccount
metadata:
name: tiller
namespace: kube-system
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: tiller
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: cluster-admin
subjects:
- kind: ServiceAccount
name: tiller
namespace: kube-system
EOF
# Install tiller into the cluster and add the jetstack and banzaicloud repos
helm init --service-account tiller
helm repo add jetstack https://charts.jetstack.io
helm repo add banzaicloud-stable https://kubernetes-charts.banzaicloud.com/
helm repo update
# Install the cert-manager and CRDs into the cluster
kubectl create ns cert-manager
kubectl label namespace cert-manager certmanager.k8s.io/disable-validation=true
kubectl apply -f https://raw.githubusercontent.com/jetstack/cert-manager/release-0.10/deploy/manifests/00-crds.yaml
helm install --name cert-manager --namespace cert-manager --version v0.10.1 --set webhook.enabled=false jetstack/cert-manager
# Install the zookeeper-operator and setup a zookeeper cluster
helm install --name zookeeper-operator --namespace zookeeper banzaicloud-stable/zookeeper-operator
cat << EOF | kubectl apply -f -
apiVersion: zookeeper.pravega.io/v1beta1
kind: ZookeeperCluster
metadata:
name: example-zookeepercluster
namespace: zookeeper
spec:
replicas: 3
EOF
We are now ready to setup a Kafka cluster running on Kubernetes. All the examples below can be found in the Koperator repository so we’ll first clone that with git.
# Clone the repository
git clone https://github.com/banzaicloud/koperator
cd koperator
# Install the operator - we will disable prometheus server for this example
helm install --name kafka-operator --namespace kafka banzaicloud-stable/kafka-operator
# Create the example Kafka cluster with internal SSL and ACLs enabled
kubectl apply -n kafka -f config/samples/kafkacluster_with_ssl_groups.yaml
The last command above will return immediately, but it may take 2-3 minutes for all brokers and cruise-control to startup. You can use kubectl
to see when the brokers are all ready.
The output should look something like this:
$> kubectl get pod -n kafka
NAME READY STATUS RESTARTS AGE
kafka-cruisecontrol-549976ffd8-6swvz 1/1 Running 0 1m48s
kafka-operator-operator-5cf97c9959-llpnn 2/2 Running 0 3m22s
kafkab6dx6 1/1 Running 0 2m6s
kafkafhdsv 1/1 Running 0 2m7s
kafkapnd5w 1/1 Running 0 2m21s
The sample CR configured a three-broker Kafka cluster with SSL, a managed PKI for user certificates, and a cluster-wide internal reachable address of kafka-headless.kafka.svc.cluster.local:29092
.
We can now deploy kafka topics, users, and a pod reading/writing from the topic with a simple manifest:
# kafka-operator/hack/kafka-test-pod/manifest.yaml
#
# A KafkaTopic called 'test-topic' with a single partition replicated across all three brokers
# -- you can also specify arbitrary topic configurations with `spec.config` --
apiVersion: banzaicloud.banzaicloud.io/v1alpha1
kind: KafkaTopic
metadata:
name: test-topic
spec:
clusterRef:
name: kafka
namespace: kafka
name: test-topic
partitions: 1
replicationFactor: 3
---
# A KafkaUser with permission to read from 'test-topic'
apiVersion: banzaicloud.banzaicloud.io/v1alpha1
kind: KafkaUser
metadata:
name: test-kafka-consumer
spec:
clusterRef:
name: kafka
namespace: kafka
secretName: test-kafka-consumer
topicGrants:
- topicName: test-topic
accessType: read
---
# A KafkaUser with permission to write to 'test-topic'
apiVersion: banzaicloud.banzaicloud.io/v1alpha1
kind: KafkaUser
metadata:
name: test-kafka-producer
spec:
clusterRef:
name: kafka
namespace: kafka
secretName: test-kafka-producer
topicGrants:
- topicName: test-topic
accessType: write
---
# A pod containing a producer and consumer using the above credentials
apiVersion: v1
kind: Pod
metadata:
name: kafka-test-pod
spec:
volumes:
# The consumer secret
- name: consumer-credentials
secret:
defaultMode: 420
secretName: test-kafka-consumer
# The producer secret
- name: producer-credentials
secret:
defaultMode: 420
secretName: test-kafka-producer
containers:
# Container reading from topic with the consumer credentials
# The environment variables used below are for our test-application,
# which is described in greater detail later in this post.
- name: consumer
image: banzaicloud/kafka-test:latest
env:
- name: KAFKA_MODE
value: consumer
volumeMounts:
- mountPath: /etc/secrets/certs
name: consumer-credentials
readOnly: true
# Container writing to topic with the producer credentials
- name: producer
image: banzaicloud/kafka-test:latest
env:
- name: KAFKA_MODE
value: producer
volumeMounts:
- mountPath: /etc/secrets/certs
name: producer-credentials
readOnly: true
You can apply the manifest above (found in the repository) to your test cluster and it should start reading and writing from Kafka. We use kubetail to tail the logs below, which is a script for tailing the logs of multiple pods/containers at a time.
$> kubectl apply -f hack/kafka-test-pod/manifest.yaml
kafkatopic.banzaicloud.banzaicloud.io/test-topic created
kafkauser.banzaicloud.banzaicloud.io/test-kafka-consumer created
kafkauser.banzaicloud.banzaicloud.io/test-kafka-producer created
pod/kafka-test-pod created
# The above pod may restart a couple times at first due to applying
# all the objects at once like this and not everything being ready.
# But after a few seconds you should be able to tail the logs and see something like this.
$> kubetail kafka-test-pod
Will tail 2 logs...
kafka-test-pod consumer
kafka-test-pod producer
[kafka-test-pod producer] 2019/09/19 02:07:18 Sending message to topic
[kafka-test-pod consumer] 2019/09/19 02:07:18 Consumed message offset 0 - hello world
[kafka-test-pod producer] 2019/09/19 02:07:23 Sending message to topic
[kafka-test-pod consumer] 2019/09/19 02:07:23 Consumed message offset 1 - hello world
[kafka-test-pod producer] 2019/09/19 02:07:28 Sending message to topic
[kafka-test-pod consumer] 2019/09/19 02:07:28 Consumed message offset 2 - hello world
[kafka-test-pod producer] 2019/09/19 02:07:33 Sending message to topic
[kafka-test-pod consumer] 2019/09/19 02:07:33 Consumed message offset 3 - hello world
^C
Show me some code! 🔗︎
The code running in the pod above is a simple Go application that depends on the environment variable KAFKA_MODE
, which will either read or be written from our test topic.
The code for this example can be found in the kafka-operator
repository at hack/kafka-test-pod/main.go
and at the end of this blog post.
First, if you look at the manifest above, you’ll see that we set a secret to store our user credentials.
We are mounting this secret inside the pod at /etc/secrets/certs
.
We first define a function that can create a TLS configuration from those credentials.
package main
import (
"crypto/tls"
"crypto/x509"
"io/ioutil"
"log"
)
// The paths to our credentials
const certFile = "/etc/secrets/certs/tls.crt"
const keyFile = "/etc/secrets/certs/tls.key"
const caFile = "/etc/secrets/certs/ca.crt"
// ...
func getTLSConfig() *tls.Config {
// Create a TLS configuration from our secret for connecting to Kafka
clientCert, err := tls.LoadX509KeyPair(certFile, keyFile)
if err != nil {
log.Fatal(err)
}
caCert, err := ioutil.ReadFile(caFile)
if err != nil {
log.Fatal(err)
}
caPool := x509.NewCertPool()
caPool.AppendCertsFromPEM(caCert)
return &tls.Config{
Certificates: []tls.Certificate{clientCert},
RootCAs: caPool,
}
}
In this example, we’re using Shopify’s sarama library to interact with Kafka. We can include the TLS configuration created by the function above in our options to the sarama client. For example, to create a new consumer we can do the following:
package main
import (
// ...
"github.com/Shopify/sarama"
)
var brokerAddrs = []string{"kafka-headless.kafka.svc.cluster.local:29092"}
func main() {
config := sarama.NewConfig()
config.Net.TLS.Enable = true
config.Net.TLS.Config = getTLSConfig()
// Get a consumer
consumer, err := sarama.NewConsumer(brokerAddrs, config)
if err != nil {
log.Fatal(err)
}
defer consumer.Close()
}
The full application will create a sarama configuration using the methods above. Then, depending on the value of the environment variable, will produce or consume from the topic. Below is the full example with inline comments.
package main
import (
"crypto/tls"
"crypto/x509"
"io/ioutil"
"log"
"os"
"os/signal"
"syscall"
"time"
"github.com/Shopify/sarama"
)
// Constants for our topic and credential locations
const kafkaTopic = "test-topic"
const certFile = "/etc/secrets/certs/tls.crt"
const keyFile = "/etc/secrets/certs/tls.key"
const caFile = "/etc/secrets/certs/ca.crt"
// The address we are going to use for kafka broker discovery
var brokerAddrs = []string{"kafka-headless.kafka.svc.cluster.local:29092"}
func main() {
// Create an SSL configuration for connecting to Kafka
config := sarama.NewConfig()
config.Net.TLS.Enable = true
config.Net.TLS.Config = getTLSConfig()
// Trap SIGINT and SIGTERM to trigger a shutdown.
signals := make(chan os.Signal, 1)
signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM)
// Consume or produce depending on the value of the environment variable
if os.Getenv("KAFKA_MODE") == "consumer" {
consume(config, signals)
} else if os.Getenv("KAFKA_MODE") == "producer" {
produce(config, signals)
} else {
log.Fatal("Invalid test mode:", os.Getenv("KAFKA_MODE"))
}
}
func getTLSConfig() *tls.Config {
// Create a TLS configuration from our secret for connecting to Kafka
clientCert, err := tls.LoadX509KeyPair(certFile, keyFile)
if err != nil {
log.Fatal(err)
}
caCert, err := ioutil.ReadFile(caFile)
if err != nil {
log.Fatal(err)
}
caPool := x509.NewCertPool()
caPool.AppendCertsFromPEM(caCert)
return &tls.Config{
Certificates: []tls.Certificate{clientCert},
RootCAs: caPool,
}
}
func consume(config *sarama.Config, signals chan os.Signal) {
// Get a consumer
consumer, err := sarama.NewConsumer(brokerAddrs, config)
if err != nil {
log.Fatal(err)
}
defer consumer.Close()
// Setup a channel for messages from our topic
partitionConsumer, err := consumer.ConsumePartition(kafkaTopic, 0, sarama.OffsetNewest)
if err != nil {
log.Fatal(err)
}
defer partitionConsumer.Close()
consumed := 0
ConsumerLoop:
// Consume the topic
for {
select {
case msg := <-partitionConsumer.Messages():
log.Printf("Consumed message offset %d - %s\n", msg.Offset, string(msg.Value))
consumed++
case <-signals:
log.Println("Shutting down")
break ConsumerLoop
}
}
}
func produce(config *sarama.Config, signals chan os.Signal) {
// Get a broker connection
broker := sarama.NewBroker(brokerAddrs[0])
if err := broker.Open(config); err != nil {
log.Fatal(err)
}
defer broker.Close()
// Create a time ticker to trigger a message every 5 seconds
ticker := time.NewTicker(time.Duration(5) * time.Second)
ProducerLoop:
// Send a message everytime the ticker is triggered
for {
select {
case <-ticker.C:
log.Println("Sending message to topic")
msg := &sarama.ProduceRequest{}
msg.AddMessage(kafkaTopic, 0, &sarama.Message{
Value: []byte("hello world"),
})
if _, err := broker.Produce(msg); err != nil {
log.Fatal(err)
}
case <-signals:
log.Println("Shutting down")
break ProducerLoop
}
}
}
And, voila, you’ve set up a Kafka cluster with access control and user authentication. Hopefully, this exercise had been sufficiently enlightening, but if you have questions, or if you’re interested in contributing, check us out on GitHub.
About Banzai Cloud Pipeline 🔗︎
Banzai Cloud’s Pipeline provides a platform for enterprises to develop, deploy, and scale container-based applications. It leverages best-of-breed cloud components, such as Kubernetes, to create a highly productive, yet flexible environment for developers and operations teams alike. Strong security measures — multiple authentication backends, fine-grained authorization, dynamic secret management, automated secure communications between components using TLS, vulnerability scans, static code analysis, CI/CD, and so on — are default features of the Pipeline platform.