The following example shows you how to deploy Amazon’s S3 Sink Connector.

Prerequisites 🔗︎

  • An Apache Kafka cluster (including Kafka Connect) deployed with Supertubes
  • AWS credentials with privileges to write to an S3 bucket. The S3 Sink Connector needs AWS credentials to be able to write messages from a topic to an S3 bucket. The AWS credentials can be passed to the connector through a file that is mounted into the hosting Kafka Connect cluster.

Steps 🔗︎

  1. Create a Kubernetes secret from the AWS credentials:

    echo "aws_access_key_id=<my-aws-access-key-id>" >> aws_creds.txt
    echo "aws_secret_access_key=<my-aws-access-key" >> aws_creds.txt
    
    cat aws_creds.txt | base64
    
    apiVersion: v1
    kind: Secret
    metadata:
    name: aws-s3-secret
    namespace: kafka
    data:
    aws-s3-creds.properties: # base64 encoded AWS credentials (creds.txt)
    
  2. Mount the secret into Kafka Connect. Modify the KafkaConnect custom resource, as follows, to mount AWS secrets into the Kafka Connect pods:

    apiVersion: kafka.banzaicloud.io/v1beta1
    kind: KafkaConnect
    metadata:
    name: kafka
    namespace: kafka
    ...
    spec:
    ...
    volumeMounts:
    - name: aws-s3-creds
        readOnly: true
        mountPath: /etc/secrets
    volumes:
    - name: aws-s3-creds
        secret:
        secretName: aws-s3-secret
    
  3. Create the S3 Sink Connector.

    apiVersion: kafka.banzaicloud.io/v1beta1
    kind: KafkaConnector
    metadata:
    name: my-s3-sink
    spec:
    class: "io.confluent.connect.s3.S3SinkConnector"
    tasksMax: 6
    clusterRef:
        name: <
    config:
        aws.access.key.id: ${file:/etc/secrets/aws-s3-creds.properties:aws_access_key_id}
        aws.secret.access.key: ${file:/etc/secrets/aws-s3-creds.properties:aws_secret_access_key}
        flush.size: "10000"
        format.class: io.confluent.connect.s3.format.bytearray.ByteArrayFormat
        key.converter: org.apache.kafka.connect.converters.ByteArrayConverter
        locale: en-US
        partition.duration.ms: "30000"
        partitioner.class: io.confluent.connect.storage.partitioner.TimeBasedPartitioner
        path.format: "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH"
        rotate.schedule.interval.ms: "180000"
        s3.bucket.name: my-test-s3-bucket
        s3.region: eu-central-1
        schema.compatibility: NONE
        schema.generator.class: io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator
        storage.class: io.confluent.connect.s3.storage.S3Storage
        timezone: UTC
        topics: my-example-topic
        value.converter: org.apache.kafka.connect.converters.ByteArrayConverter
    
  4. If Kafka ACLs are enabled, in addition to the ACLs created by Supertubes for Kafka Connect, the following ACLs must be created for the S3 Sink Connector:

    apiVersion: kafka.banzaicloud.io/v1beta1
    kind: KafkaACL
    metadata:
    name: my-s3-sink-kafkaacl
    namespace: kafka
    spec:
    kind: User
    name: CN=kafka-kafka-kafka-connect
    clusterRef:
        name: kafka
        namespace: kafka
    roles:
        - name: consumer
        resourceSelectors:
            - name: my-example-topic
            namespace: kafka
            - name: connector-groups
            namespace: kakfa
    
    apiVersion: kafka.banzaicloud.io/v1beta1
    kind: KafkaResourceSelector
    metadata:
    name: my-example-topic
    namespace: kafka
    spec:
    type: topic
    name: example-topic
    pattern: literal
    
    apiVersion: kafka.banzaicloud.io/v1beta1
    kind: KafkaResourceSelector
    metadata:
    name: connector-groups
    namespace: kafka
    spec:
    type: group
    name: connect-
    pattern: prefixed