Create Topic 🔗︎
Topic creation by default is enabled in Apache Kafka, but if it is configured otherwise, you’ll need to create a topic first.
-
You can use the
KafkaTopic
CR to create a topic called my-topic like this:cat << EOF | kubectl apply -n kafka -f - apiVersion: kafka.banzaicloud.io/v1alpha1 kind: KafkaTopic metadata: name: my-topic spec: clusterRef: name: kafka name: my-topic partitions: 1 replicationFactor: 1 config: "retention.ms": "604800000" "cleanup.policy": "delete" EOF
Note: The previous command will fail if the cluster has not finished provisioning.
Expected output:
kafkatopic.kafka.banzaicloud.io/my-topic created
-
To create a sample topic from the CLI you can run the following:
For internal listeners exposed by a headless service (
KafkaCluster.spec.headlessServiceEnabled
set totrue
):kubectl -n kafka run kafka-topics -it --image=ghcr.io/banzaicloud/kafka:2.13-3.1.0 --rm=true --restart=Never -- /opt/kafka/bin/kafka-topics.sh --bootstrap-server kafka-headless.kafka:29092 --topic my-topic --create --partitions 1 --replication-factor 1
For internal listeners exposed by a regular service (
KafkaCluster.spec.headlessServiceEnabled
set tofalse
):kubectl -n kafka run kafka-topics -it --image=ghcr.io/banzaicloud/kafka:2.13-3.1.0 --rm=true --restart=Never -- /opt/kafka/bin/kafka-topics.sh --bootstrap-server kafka-all-broker.kafka:29092 --topic my-topic --create --partitions 1 --replication-factor 1
After you have created a topic, produce and consume some messages:
- Send and receive messages without SSL within a cluster
- Send and receive messages with SSL within a cluster
Send and receive messages without SSL within a cluster 🔗︎
You can use the following commands to send and receive messages within a Kubernetes cluster when SSL encryption is disabled for Kafka.
-
Produce messages:
-
Start the producer container
kubectl run \ -n kafka \ kafka-producer \ -it \ --image=ghcr.io/banzaicloud/kafka:2.13-3.1.0 \ --rm=true \ --restart=Never \ -- \ /opt/kafka/bin/kafka-console-producer.sh \ --bootstrap-server kafka-headless:29092 \ --topic my-topic
-
Wait for the producer container to run, this may take a couple seconds.
Expected output:
If you don't see a command prompt, try pressing enter.
-
Press enter to get a command prompt.
Expected output:
>
-
Type your messages and press enter, each line will be sent through Kafka.
Example:
> test > message >
-
Stop the container. (You can CTRL-D out of it.)
Expected output:
pod "kafka-producer" deleted
-
-
Consume messages:
-
Start the consumer container.
kubectl run \ -n kafka \ kafka-consumer \ -it \ --image=ghcr.io/banzaicloud/kafka:2.13-3.1.0 \ --rm=true \ --restart=Never \ -- \ /opt/kafka/bin/kafka-console-consumer.sh \ --bootstrap-server kafka-headless:29092 \ --topic my-topic \ --from-beginning
-
Wait for the consumer container to run, this may take a couple seconds.
Expected output:
If you don't see a command prompt, try pressing enter.
-
The messages sent by the producer should be displayed here.
Example:
test message
-
Stop the container. (You can CTRL-C out of it.)
Expected output:
Processed a total of 3 messages pod "kafka-consumer" deleted pod kafka/kafka-consumer terminated (Error)
-
Send and receive messages with SSL within a cluster 🔗︎
You can use the following procedure to send and receive messages within a Kubernetes cluster when SSL encryption is enabled for Kafka. To test a Kafka instance secured by SSL we recommend using kcat.
To use the java client instead of kcat, generate the proper truststore and keystore using the official docs.
-
Create a Kafka user. The client will use this user account to access Kafka. You can use the KafkaUser custom resource to customize the access rights as needed. For example:
kubectl create -n kafka -f - <<EOF apiVersion: kafka.banzaicloud.io/v1alpha1 kind: KafkaUser metadata: name: example-kafkauser namespace: kafka spec: clusterRef: name: kafka secretName: example-kafkauser-secret EOF
-
To use Kafka inside the cluster, create a Pod which contains
kcat
. Create akafka-test
pod in thekafka
namespace. Note that the value of the secretName parameter must be the same as you used when creating the KafkaUser resource, for example, example-kafkauser-secret.kubectl create -n kafka -f - <<EOF apiVersion: v1 kind: Pod metadata: name: kafka-test spec: containers: - name: kafka-test image: edenhill/kcat:1.7.0 # Just spin & wait forever command: [ "/bin/sh", "-c", "--" ] args: [ "while true; do sleep 3000; done;" ] volumeMounts: - name: sslcerts mountPath: "/ssl/certs" volumes: - name: sslcerts secret: secretName: example-kafkauser-secret EOF
-
Wait until the pod is created, then exec into the container:
kubectl exec -it -n kafka kafka-test -- sh
-
Run the following command to check that you can connect to the brokers.
kcat -L -b kafka-headless:29092 -X security.protocol=SSL -X ssl.key.location=/ssl/certs/tls.key -X ssl.certificate.location=/ssl/certs/tls.crt -X ssl.ca.location=/ssl/certs/ca.crt
The first line of the output should indicate that the communication is encrypted, for example:
Metadata for all topics (from broker -1: ssl://kafka-headless:29092/bootstrap):
-
Produce some test messages. Run:
kcat -P -b kafka-headless:29092 -t my-topic \ -X security.protocol=SSL \ -X ssl.key.location=/ssl/certs/tls.key \ -X ssl.certificate.location=/ssl/certs/tls.crt \ -X ssl.ca.location=/ssl/certs/ca.crt
And type some test messages.
-
Consume some messages. The following command will use the certificate provisioned with the cluster to connect to Kafka. If you’d like to create and use a different user, create a
KafkaUser
CR, for details, see the SSL documentation.kcat -C -b kafka-headless:29092 -t my-topic \ -X security.protocol=SSL \ -X ssl.key.location=/ssl/certs/tls.key \ -X ssl.certificate.location=/ssl/certs/tls.crt \ -X ssl.ca.location=/ssl/certs/ca.crt
You should see the messages you have created.
Send and receive messages outside a cluster 🔗︎
Prerequisites 🔗︎
-
Producers and consumers that are not in the same Kubernetes cluster can access the Kafka cluster only if an external listener is configured in your KafkaCluster CR. Check that the listenersConfig.externalListeners section exists in the KafkaCluster CR.
-
Obtain the external address and port number of the cluster by running the following commands.
-
If the external listener uses a LoadBalancer:
export SERVICE_IP=$(kubectl get svc --namespace kafka -o jsonpath="{.status.loadBalancer.ingress[0].hostname}" envoy-loadbalancer-external-kafka); echo $SERVICE_IP export SERVICE_PORTS=($(kubectl get svc --namespace kafka -o jsonpath="{.spec.ports[*].port}" envoy-loadbalancer-external-kafka)); echo ${SERVICE_PORTS[@]} # depending on the shell you are using, arrays may be indexed starting from 0 or 1 export SERVICE_PORT=${SERVICE_PORTS[@]:0:1}; echo $SERVICE_PORT
-
-
If the external listener of your Kafka cluster accepts encrypted connections, proceed to SSL enabled. Otherwise, proceed to SSL disabled.
SSL disabled 🔗︎
-
Produce some test messages on the the external client.
-
If you have kcat installed, run:
kcat -P -b $SERVICE_IP:$SERVICE_PORT -t my-topic
-
If you have the Java Kafka client installed, run:
kafka-console-producer.sh --bootstrap-server $SERVICE_IP:$SERVICE_PORT --topic my-topic
And type some test messages.
-
-
Consume some messages.
-
If you have kcat installed, run:
kcat -C -b $SERVICE_IP:$SERVICE_PORT -t my-topic
-
If you have the Java Kafka client installed, run:
kafka-console-consumer.sh --bootstrap-server $SERVICE_IP:$SERVICE_PORT --topic my-topic --from-beginning
You should see the messages you have created.
-
SSL enabled 🔗︎
You can use the following procedure to send and receive messages from an external host that is outside a Kubernetes cluster when SSL encryption is enabled for Kafka. To test a Kafka instance secured by SSL we recommend using kcat.
To use the java client instead of kcat, generate the proper truststore and keystore using the official docs.
-
Install kcat.
-
MacOS:
brew install kcat
-
Ubuntu:
apt-get update apt-get install kcat
-
-
Connect to the Kubernetes cluster that runs your Kafka deployment.
-
Create a Kafka user. The client will use this user account to access Kafka. You can use the KafkaUser custom resource to customize the access rights as needed. For example:
kubectl create -n kafka -f - <<EOF apiVersion: kafka.banzaicloud.io/v1alpha1 kind: KafkaUser metadata: name: example-kafkauser namespace: kafka spec: clusterRef: name: kafka secretName: example-kafkauser-secret EOF
-
Download the certificate and the key of the user, and the CA certificate used to verify the certificate of the Kafka server. These are available in the Kubernetes Secret created for the KafkaUser.
kubectl get secrets -n kafka <name-of-the-user-secret> -o jsonpath="{['data']['tls\.crt']}" | base64 -D > client.crt.pem kubectl get secrets -n kafka <name-of-the-user-secret> -o jsonpath="{['data']['tls\.key']}" | base64 -D > client.key.pem kubectl get secrets -n kafka <name-of-the-user-secret> -o jsonpath="{['data']['ca\.crt']}" | base64 -D > ca.crt.pem
-
Copy the downloaded certificates to a location that is accessible to the external host.
-
If you haven’t done so already, obtain the external address and port number of the cluster.
-
Produce some test messages on the host that is outside your cluster.
kcat -b $SERVICE_IP:$SERVICE_PORT -P -X security.protocol=SSL \ -X ssl.key.location=client.key.pem \ -X ssl.certificate.location=client.crt.pem \ -X ssl.ca.location=ca.crt.pem \ -t my-topic
And type some test messages.
-
Consume some messages.
kcat -b $SERVICE_IP:$SERVICE_PORT -C -X security.protocol=SSL \ -X ssl.key.location=client.key.pem \ -X ssl.certificate.location=client.crt.pem \ -X ssl.ca.location=ca.crt.pem \ -t my-topic
You should see the messages you have created.