Class: LogStash::Outputs::Kafka
- Inherits:
-
Base
- Object
- Base
- LogStash::Outputs::Kafka
- Defined in:
- lib/logstash/outputs/kafka.rb
Overview
Write events to a Kafka topic. This uses the Kafka Producer API to write messages to a topic on the broker.
Here’s a compatibility matrix that shows the Kafka broker and client versions that are compatible with each combination of Logstash and the Kafka output plugin:
- options=“header”
-
|========================================================== |Kafka Broker Version |Kafka Client Version |Logstash Version |Plugin Version |Why? |0.8 |0.8 |2.0.0 - 2.x.x |<3.0.0 |Legacy, 0.8 is still popular |0.9 |0.9 |2.0.0 - 2.3.x | 3.x.x |Works with the old Ruby Event API (‘event[’price’] = 10`)
|0.9 |0.9 |2.4.0 - 5.0.x | 4.x.x |Works with the new getter/setter APIs (‘event.set(‘[price]’, 10)‘) |0.10 |0.10 |2.4.0 - 5.0.x | 5.x.x |Not compatible with the 0.9 broker |==========================================================
NOTE: It’s a good idea to upgrade brokers before consumers/producers because brokers target backwards compatibility. For example, the 0.9 broker will work with both the 0.8 consumer and 0.9 consumer APIs, but not the other way around.
The only required configuration is the topic_id. The default codec is json, so events will be persisted on the broker in json format. If you select a codec of plain, Logstash will encode your messages with not only the message but also with a timestamp and hostname. If you do not want anything but your message passing through, you should make the output configuration something like:
- source,ruby
-
output
kafka { codec => plain { format => "%{message" } topic_id => "mytopic" }}
For more information see kafka.apache.org/documentation.html#theproducer
Kafka producer configuration: kafka.apache.org/documentation.html#newproducerconfigs
Instance Method Summary collapse
- #close ⇒ Object
-
#receive(event) ⇒ Object
def register.
- #register ⇒ Object
Instance Method Details
#close ⇒ Object
162 163 164 |
# File 'lib/logstash/outputs/kafka.rb', line 162 def close @producer.close end |
#receive(event) ⇒ Object
def register
155 156 157 158 159 160 |
# File 'lib/logstash/outputs/kafka.rb', line 155 def receive(event) if event == LogStash::SHUTDOWN return end @codec.encode(event) end |
#register ⇒ Object
135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 |
# File 'lib/logstash/outputs/kafka.rb', line 135 def register @producer = create_producer @codec.on_event do |event, data| begin if .nil? record = org.apache.kafka.clients.producer.ProducerRecord.new(event.sprintf(@topic_id), data) else record = org.apache.kafka.clients.producer.ProducerRecord.new(event.sprintf(@topic_id), event.sprintf(), data) end @producer.send(record) rescue LogStash::ShutdownSignal @logger.info('Kafka producer got shutdown signal') rescue => e @logger.warn('kafka producer threw exception, restarting', :exception => e) end end end |