Class: LogStash::Outputs::Kafka

Inherits:
Base
  • Object
show all
Defined in:
lib/logstash/outputs/kafka.rb

Overview

Write events to a Kafka topic. This uses the Kafka Producer API to write messages to a topic on the broker.

Here’s a compatibility matrix that shows the Kafka broker and client versions that are compatible with each combination of Logstash and the Kafka output plugin:

options=“header”

|========================================================== |Kafka Broker Version |Kafka Client Version |Logstash Version |Plugin Version |Why? |0.8 |0.8 |2.0.0 - 2.x.x |<3.0.0 |Legacy, 0.8 is still popular |0.9 |0.9 |2.0.0 - 2.3.x | 3.x.x |Works with the old Ruby Event API (‘event[’price’] = 10`)

|0.9 |0.9 |2.4.0 - 5.0.x | 4.x.x |Works with the new getter/setter APIs (‘event.set(‘[price]’, 10)‘) |0.10 |0.10 |2.4.0 - 5.0.x | 5.x.x |Not compatible with the 0.9 broker |==========================================================

NOTE: It’s a good idea to upgrade brokers before consumers/producers because brokers target backwards compatibility. For example, the 0.9 broker will work with both the 0.8 consumer and 0.9 consumer APIs, but not the other way around.

The only required configuration is the topic_id. The default codec is json, so events will be persisted on the broker in json format. If you select a codec of plain, Logstash will encode your messages with not only the message but also with a timestamp and hostname. If you do not want anything but your message passing through, you should make the output configuration something like:

source,ruby

output

kafka {
  codec => plain {
     format => "%{message"
  }
  topic_id => "mytopic"
}

}

For more information see kafka.apache.org/documentation.html#theproducer

Kafka producer configuration: kafka.apache.org/documentation.html#newproducerconfigs

Instance Method Summary collapse

Instance Method Details

#closeObject



162
163
164
# File 'lib/logstash/outputs/kafka.rb', line 162

def close
  @producer.close
end

#receive(event) ⇒ Object

def register



155
156
157
158
159
160
# File 'lib/logstash/outputs/kafka.rb', line 155

def receive(event)
  if event == LogStash::SHUTDOWN
    return
  end
  @codec.encode(event)
end

#registerObject



135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
# File 'lib/logstash/outputs/kafka.rb', line 135

def register
  @producer = create_producer
  @codec.on_event do |event, data|
    begin
      if @message_key.nil?
        record = org.apache.kafka.clients.producer.ProducerRecord.new(event.sprintf(@topic_id), data)
      else
        record = org.apache.kafka.clients.producer.ProducerRecord.new(event.sprintf(@topic_id), event.sprintf(@message_key), data)
      end
      @producer.send(record)
    rescue LogStash::ShutdownSignal
      @logger.info('Kafka producer got shutdown signal')
    rescue => e
      @logger.warn('kafka producer threw exception, restarting',
                   :exception => e)
    end
  end

end