The following example shows you how to deploy Amazon’s S3 Sink Connector.
Prerequisites 🔗︎
- An Apache Kafka cluster (including Kafka Connect) deployed with Supertubes
- AWS credentials with privileges to write to an S3 bucket. The S3 Sink Connector needs AWS credentials to be able to write messages from a topic to an S3 bucket. The AWS credentials can be passed to the connector through a file that is mounted into the hosting Kafka Connect cluster.
Steps 🔗︎
-
Create a Kubernetes secret from the AWS credentials:
echo "aws_access_key_id=<my-aws-access-key-id>" >> aws_creds.txt echo "aws_secret_access_key=<my-aws-access-key" >> aws_creds.txt cat aws_creds.txt | base64
apiVersion: v1 kind: Secret metadata: name: aws-s3-secret namespace: kafka data: aws-s3-creds.properties: # base64 encoded AWS credentials (creds.txt)
-
Mount the secret into Kafka Connect. Modify the KafkaConnect custom resource, as follows, to mount AWS secrets into the Kafka Connect pods:
apiVersion: kafka.banzaicloud.io/v1beta1 kind: KafkaConnect metadata: name: kafka namespace: kafka ... spec: ... volumeMounts: - name: aws-s3-creds readOnly: true mountPath: /etc/secrets volumes: - name: aws-s3-creds secret: secretName: aws-s3-secret
-
Create the S3 Sink Connector.
apiVersion: kafka.banzaicloud.io/v1beta1 kind: KafkaConnector metadata: name: my-s3-sink spec: class: "io.confluent.connect.s3.S3SinkConnector" tasksMax: 6 clusterRef: name: < config: aws.access.key.id: ${file:/etc/secrets/aws-s3-creds.properties:aws_access_key_id} aws.secret.access.key: ${file:/etc/secrets/aws-s3-creds.properties:aws_secret_access_key} flush.size: "10000" format.class: io.confluent.connect.s3.format.bytearray.ByteArrayFormat key.converter: org.apache.kafka.connect.converters.ByteArrayConverter locale: en-US partition.duration.ms: "30000" partitioner.class: io.confluent.connect.storage.partitioner.TimeBasedPartitioner path.format: "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH" rotate.schedule.interval.ms: "180000" s3.bucket.name: my-test-s3-bucket s3.region: eu-central-1 schema.compatibility: NONE schema.generator.class: io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator storage.class: io.confluent.connect.s3.storage.S3Storage timezone: UTC topics: my-example-topic value.converter: org.apache.kafka.connect.converters.ByteArrayConverter
-
If Kafka ACLs are enabled, in addition to the ACLs created by Supertubes for Kafka Connect, the following ACLs must be created for the S3 Sink Connector:
apiVersion: kafka.banzaicloud.io/v1beta1 kind: KafkaACL metadata: name: my-s3-sink-kafkaacl namespace: kafka spec: kind: User name: CN=kafka-kafka-kafka-connect clusterRef: name: kafka namespace: kafka roles: - name: consumer resourceSelectors: - name: my-example-topic namespace: kafka - name: connector-groups namespace: kakfa
apiVersion: kafka.banzaicloud.io/v1beta1 kind: KafkaResourceSelector metadata: name: my-example-topic namespace: kafka spec: type: topic name: example-topic pattern: literal
apiVersion: kafka.banzaicloud.io/v1beta1 kind: KafkaResourceSelector metadata: name: connector-groups namespace: kafka spec: type: group name: connect- pattern: prefixed