Kafka output plugin for Fluentd 🔗︎
Overview 🔗︎
More info at https://github.com/fluent/fluent-plugin-kafka
Example Deployment: Transport Nginx Access Logs into Kafka with Logging Operator
Example output configurations 🔗︎
spec:
kafka:
brokers: kafka-headless.kafka.svc.cluster.local:29092
default_topic: topic
sasl_over_ssl: false
format:
type: json
buffer:
tags: topic
timekey: 1m
timekey_wait: 30s
timekey_use_utc: true
Configuration 🔗︎
Kafka 🔗︎
Send your logs to Kafka
brokers (string, required) 🔗︎
The list of all seed brokers, with their host and port information.
Default: -
topic_key (string, optional) 🔗︎
Topic Key
Default: “topic”
partition_key (string, optional) 🔗︎
Partition
Default: “partition”
partition_key_key (string, optional) 🔗︎
Partition Key
Default: “partition_key”
message_key_key (string, optional) 🔗︎
Message Key
Default: “message_key”
client_id (string, optional) 🔗︎
Client ID
Default: “kafka”
default_topic (string, optional) 🔗︎
The name of default topic .
Default: nil
default_partition_key (string, optional) 🔗︎
The name of default partition key .
Default: nil
default_message_key (string, optional) 🔗︎
The name of default message key .
Default: nil
exclude_topic_key (bool, optional) 🔗︎
Exclude Topic key
Default: false
exclude_partion_key (bool, optional) 🔗︎
Exclude Partition key
Default: false
get_kafka_client_log (bool, optional) 🔗︎
Get Kafka Client log
Default: false
headers (map[string]string, optional) 🔗︎
Headers
Default: {}
headers_from_record (map[string]string, optional) 🔗︎
Headers from Record
Default: {}
use_default_for_unknown_topic (bool, optional) 🔗︎
Use default for unknown topics
Default: false
idempotent (bool, optional) 🔗︎
Idempotent
Default: false
sasl_over_ssl (bool, required) 🔗︎
SASL over SSL
Default: true
principal (string, optional) 🔗︎
Default: -
keytab (*secret.Secret, optional) 🔗︎
Default: -
username (*secret.Secret, optional) 🔗︎
Username when using PLAIN/SCRAM SASL authentication
Default: -
password (*secret.Secret, optional) 🔗︎
Password when using PLAIN/SCRAM SASL authentication
Default: -
scram_mechanism (string, optional) 🔗︎
If set, use SCRAM authentication with specified mechanism. When unset, default to PLAIN authentication
Default: -
max_send_retries (int, optional) 🔗︎
Number of times to retry sending of messages to a leader
Default: 1
required_acks (int, optional) 🔗︎
The number of acks required per request .
Default: -1
ack_timeout (int, optional) 🔗︎
How long the producer waits for acks. The unit is seconds
Default: nil => Uses default of ruby-kafka library
compression_codec (string, optional) 🔗︎
The codec the producer uses to compress messages . The available options are gzip and snappy.
Default: nil
kafka_agg_max_bytes (int, optional) 🔗︎
Maximum value of total message size to be included in one batch transmission. .
Default: 4096
kafka_agg_max_messages (int, optional) 🔗︎
Maximum number of messages to include in one batch transmission. .
Default: nil
discard_kafka_delivery_failed (bool, optional) 🔗︎
Discard the record where Kafka DeliveryFailed occurred
Default: false
ssl_ca_certs_from_system (*bool, optional) 🔗︎
System’s CA cert store
Default: false
ssl_ca_cert (*secret.Secret, optional) 🔗︎
CA certificate
Default: -
ssl_client_cert (*secret.Secret, optional) 🔗︎
Client certificate
Default: -
ssl_client_cert_chain (*secret.Secret, optional) 🔗︎
Client certificate chain
Default: -
ssl_client_cert_key (*secret.Secret, optional) 🔗︎
Client certificate key
Default: -
ssl_verify_hostname (*bool, optional) 🔗︎
Verify certificate hostname
Default: -
format (*Format, required) 🔗︎
Default: -
buffer (*Buffer, optional) 🔗︎
Default: -
slow_flush_log_threshold (string, optional) 🔗︎
The threshold for chunk flush performance check. Parameter type is float, not time, default: 20.0 (seconds) If chunk flush takes longer time than this threshold, fluentd logs warning message and increases metric fluentd_output_status_slow_flush_count.
Default: -