fluent-plugin-kafka, a plugin for Fluentd
A fluentd plugin to both consume and produce data for Apache Kafka.
TODO: Also, I need to write tests
Installation
Add this line to your application's Gemfile:
gem 'fluent-plugin-kafka'
And then execute:
$ bundle
Or install it yourself as:
$ gem install fluent-plugin-kafka
Requirements
- Ruby 2.1 or later
- Input plugins work with kafka v0.9 or later
- Output plugins work with kafka v0.8 or later
Usage
Input plugin (@type 'kafka')
Consume events by single consumer.
<source>
@type kafka
brokers <broker1_host>:<broker1_port>,<broker2_host>:<broker2_port>,..
topics <listening topics(separate with comma',')>
format <input text type (text|json|ltsv|msgpack)> :default => json
message_key <key (Optional, for text format only, default is message)>
add_prefix <tag prefix (Optional)>
add_suffix <tag suffix (Optional)>
# Optionally, you can manage topic offset by using zookeeper
offset_zookeeper <zookeer node list (<zookeeper1_host>:<zookeeper1_port>,<zookeeper2_host>:<zookeeper2_port>,..)>
offset_zk_root_node <offset path in zookeeper> default => '/fluent-plugin-kafka'
# ruby-kafka consumer options
max_bytes (integer) :default => nil (Use default of ruby-kafka)
max_wait_time (integer) :default => nil (Use default of ruby-kafka)
min_bytes (integer) :default => nil (Use default of ruby-kafka)
</source>
Supports a start of processing from the assigned offset for specific topics.
<source>
@type kafka
brokers <broker1_host>:<broker1_port>,<broker2_host>:<broker2_port>,..
format <input text type (text|json|ltsv|msgpack)>
<topic>
topic <listening topic>
partition <listening partition: default=0>
offset <listening start offset: default=-1>
</topic>
<topic>
topic <listening topic>
partition <listening partition: default=0>
offset <listening start offset: default=-1>
</topic>
</source>
See also ruby-kafka README for more detailed documentation about ruby-kafka.
Input plugin (@type 'kafka_group', supports kafka group)
Consume events by kafka consumer group features..
<source>
@type kafka_group
brokers <broker1_host>:<broker1_port>,<broker2_host>:<broker2_port>,..
consumer_group <consumer group name, must set>
topics <listening topics(separate with comma',')>
format <input text type (text|json|ltsv|msgpack)> :default => json
message_key <key (Optional, for text format only, default is message)>
add_prefix <tag prefix (Optional)>
add_suffix <tag suffix (Optional)>
# ruby-kafka consumer options
max_bytes (integer) :default => nil (Use default of ruby-kafka)
max_wait_time (integer) :default => nil (Use default of ruby-kafka)
min_bytes (integer) :default => nil (Use default of ruby-kafka)
offset_commit_interval (integer) :default => nil (Use default of ruby-kafka)
offset_commit_threshold (integer) :default => nil (Use default of ruby-kafka)
start_from_beginning (bool) :default => true
</source>
See also ruby-kafka README for more detailed documentation about ruby-kafka options.
Output plugin (non-buffered)
This plugin uses ruby-kafka producer for writing data. For performance and reliability concerns, use kafka_bufferd
output instead.
<match *.**>
@type kafka
# Brokers: you can choose either brokers or zookeeper.
brokers <broker1_host>:<broker1_port>,<broker2_host>:<broker2_port>,.. # Set brokers directly
zookeeper <zookeeper_host>:<zookeeper_port> # Set brokers via Zookeeper
zookeeper_path <broker path in zookeeper> :default => /brokers/ids # Set path in zookeeper for kafka
default_topic (string) :default => nil
default_partition_key (string) :default => nil
default_message_key (string) :default => nil
output_data_type (json|ltsv|msgpack|attr:<record name>|<formatter name>) :default => json
output_include_tag (bool) :default => false
output_include_time (bool) :default => false
exclude_topic_key (bool) :default => false
exclude_partition_key (bool) :default => false
# ruby-kafka producer options
max_send_retries (integer) :default => 1
required_acks (integer) :default => -1
ack_timeout (integer) :default => nil (Use default of ruby-kafka)
compression_codec (gzip|snappy) :default => nil
</match>
Supports following ruby-kafka::Producer options.
- max_send_retries - default: 1 - Number of times to retry sending of messages to a leader.
- required_acks - default: -1 - The number of acks required per request.
- ack_timeout - default: nil - How long the producer waits for acks. The unit is seconds.
- compression_codec - default: nil - The codec the producer uses to compress messages.
See also Kafka::Client for more detailed documentation about ruby-kafka.
This plugin supports compression codec "snappy" also. Install snappy module before you use snappy compression.
$ gem install snappy
snappy gem uses native extension, so you need to install several packages before. On Ubuntu, need development packages and snappy library.
$ sudo apt-get install build-essential autoconf automake libtool libsnappy-dev
Load balancing
Messages will be assigned a partition at random as default by ruby-kafka, but messages with the same partition key will always be assigned to the same partition by setting default_partition_key
in config file.
If key name partition_key
exists in a message, this plugin set its value of partition_key as key.
default_partition_key | partition_key | behavior |
---|---|---|
Not set | Not exists | All messages are assigned a partition at random |
Set | Not exists | All messages are assigned to the specific partition |
Not set | Exists | Messages which have partition_key record are assigned to the specific partition, others are assigned a partition at random |
Set | Exists | Messages which have partition_key record are assigned to the specific partition with parition_key, others are assigned to the specific partition with default_parition_key |
If key name message_key
exists in a message, this plugin publishes the value of message_key to kafka and can be read by consumers. Same message key will be assigned to all messages by setting default_message_key
in config file. If message_key exists and if partition_key is not set explicitly, messsage_key will be used for partitioning.
Buffered output plugin
This plugin uses ruby-kafka producer for writing data. This plugin works with recent kafka versions.
<match *.**>
@type kafka_buffered
# Brokers: you can choose either brokers or zookeeper.
brokers <broker1_host>:<broker1_port>,<broker2_host>:<broker2_port>,.. # Set brokers directly
zookeeper <zookeeper_host>:<zookeeper_port> # Set brokers via Zookeeper
zookeeper_path <broker path in zookeeper> :default => /brokers/ids # Set path in zookeeper for kafka
default_topic (string) :default => nil
default_partition_key (string) :default => nil
default_message_key (string) :default => nil
output_data_type (json|ltsv|msgpack|attr:<record name>|<formatter name>) :default => json
output_include_tag (bool) :default => false
output_include_time (bool) :default => false
exclude_topic_key (bool) :default => false
exclude_partition_key (bool) :default => false
get_kafka_client_log (bool) :default => false
# See fluentd document for buffer related parameters: http://docs.fluentd.org/articles/buffer-plugin-overview
# ruby-kafka producer options
max_send_retries (integer) :default => 1
required_acks (integer) :default => -1
ack_timeout (integer) :default => nil (Use default of ruby-kafka)
compression_codec (gzip|snappy) :default => nil (No compression)
</match>
<formatter name>
of output_data_type
uses fluentd's formatter plugins. See formatter article.
ruby-kafka sometimes returns Kafka::DeliveryFailed
error without good information.
In this case, get_kafka_client_log
is useful for identifying the error cause.
ruby-kafka's log is routed to fluentd log so you can see ruby-kafka's log in fluentd logs.
Supports following ruby-kafka's producer options.
- max_send_retries - default: 1 - Number of times to retry sending of messages to a leader.
- required_acks - default: -1 - The number of acks required per request.
- ack_timeout - default: nil - How long the producer waits for acks. The unit is seconds.
- compression_codec - default: nil - The codec the producer uses to compress messages.
See also Kafka::Client for more detailed documentation about ruby-kafka.
This plugin supports compression codec "snappy" also. Install snappy module before you use snappy compression.
$ gem install snappy
Contributing
- Fork it
- Create your feature branch (
git checkout -b my-new-feature
) - Commit your changes (
git commit -am 'Added some feature'
) - Push to the branch (
git push origin my-new-feature
) - Create new Pull Request