Kafka output plugin for Fluentd 🔗︎

Overview 🔗︎

More info at https://github.com/fluent/fluent-plugin-kafka

Example Deployment: Transport Nginx Access Logs into Kafka with Logging Operator

Example output configurations 🔗︎

    brokers: kafka-headless.kafka.svc.cluster.local:29092
    default_topic: topic
    sasl_over_ssl: false
      type: json
      tags: topic
      timekey: 1m
      timekey_wait: 30s
      timekey_use_utc: true

Configuration 🔗︎

Kafka 🔗︎

Send your logs to Kafka

brokers (string, required) 🔗︎

The list of all seed brokers, with their host and port information.

Default: -

topic_key (string, optional) 🔗︎

Topic Key

Default: “topic”

partition_key (string, optional) 🔗︎


Default: “partition”

partition_key_key (string, optional) 🔗︎

Partition Key

Default: “partition_key”

message_key_key (string, optional) 🔗︎

Message Key

Default: “message_key”

client_id (string, optional) 🔗︎

Client ID

Default: “kafka”

default_topic (string, optional) 🔗︎

The name of default topic .

Default: nil

default_partition_key (string, optional) 🔗︎

The name of default partition key .

Default: nil

default_message_key (string, optional) 🔗︎

The name of default message key .

Default: nil

exclude_topic_key (bool, optional) 🔗︎

Exclude Topic key

Default: false

exclude_partion_key (bool, optional) 🔗︎

Exclude Partition key

Default: false

get_kafka_client_log (bool, optional) 🔗︎

Get Kafka Client log

Default: false

headers (map[string]string, optional) 🔗︎


Default: {}

headers_from_record (map[string]string, optional) 🔗︎

Headers from Record

Default: {}

use_default_for_unknown_topic (bool, optional) 🔗︎

Use default for unknown topics

Default: false

idempotent (bool, optional) 🔗︎


Default: false

sasl_over_ssl (bool, required) 🔗︎


Default: true

principal (string, optional) 🔗︎

Default: -

keytab (*secret.Secret, optional) 🔗︎

Default: -

username (*secret.Secret, optional) 🔗︎

Username when using PLAIN/SCRAM SASL authentication

Default: -

password (*secret.Secret, optional) 🔗︎

Password when using PLAIN/SCRAM SASL authentication

Default: -

scram_mechanism (string, optional) 🔗︎

If set, use SCRAM authentication with specified mechanism. When unset, default to PLAIN authentication

Default: -

max_send_retries (int, optional) 🔗︎

Number of times to retry sending of messages to a leader

Default: 1

required_acks (int, optional) 🔗︎

The number of acks required per request .

Default: -1

ack_timeout (int, optional) 🔗︎

How long the producer waits for acks. The unit is seconds

Default: nil => Uses default of ruby-kafka library

compression_codec (string, optional) 🔗︎

The codec the producer uses to compress messages . The available options are gzip and snappy.

Default: nil

kafka_agg_max_bytes (int, optional) 🔗︎

Maximum value of total message size to be included in one batch transmission. .

Default: 4096

kafka_agg_max_messages (int, optional) 🔗︎

Maximum number of messages to include in one batch transmission. .

Default: nil

discard_kafka_delivery_failed (bool, optional) 🔗︎

Discard the record where Kafka DeliveryFailed occurred

Default: false

ssl_ca_certs_from_system (*bool, optional) 🔗︎

System’s CA cert store

Default: false

ssl_ca_cert (*secret.Secret, optional) 🔗︎

CA certificate

Default: -

ssl_client_cert (*secret.Secret, optional) 🔗︎

Client certificate

Default: -

ssl_client_cert_chain (*secret.Secret, optional) 🔗︎

Client certificate chain

Default: -

ssl_client_cert_key (*secret.Secret, optional) 🔗︎

Client certificate key

Default: -

ssl_verify_hostname (*bool, optional) 🔗︎

Verify certificate hostname

Default: -

format (*Format, required) 🔗︎


Default: -

buffer (*Buffer, optional) 🔗︎


Default: -

slow_flush_log_threshold (string, optional) 🔗︎

The threshold for chunk flush performance check. Parameter type is float, not time, default: 20.0 (seconds) If chunk flush takes longer time than this threshold, fluentd logs warning message and increases metric fluentd_output_status_slow_flush_count.

Default: -