Class: LogStash::Outputs::Kafka

Inherits:
Base
  • Object
show all
Defined in:
lib/logstash/outputs/kafka.rb

Overview

Write events to a Kafka topic. This uses the Kafka Producer API to write messages to a topic on the broker.

Here’s a compatibility matrix that shows the Kafka client versions that are compatible with each combination of Logstash and the Kafka output plugin:

options=“header”

|========================================================== |Kafka Client Version |Logstash Version |Plugin Version |Why? |0.8 |2.0.0 - 2.x.x |<3.0.0 |Legacy, 0.8 is still popular |0.9 |2.0.0 - 2.3.x | 3.x.x |Works with the old Ruby Event API (‘event[’price’] = 10`) |0.9 |2.4.x - 5.x.x | 4.x.x |Works with the new getter/setter APIs (‘event.set(‘[price]’, 10)‘) |0.10.0.x |2.4.x - 5.x.x | 5.x.x |Not compatible with the <= 0.9 broker |0.10.1.x |2.4.x - 5.x.x | 6.x.x | |==========================================================

NOTE: We recommended that you use matching Kafka client and broker versions. During upgrades, you should upgrade brokers before clients because brokers target backwards compatibility. For example, the 0.9 broker is compatible with both the 0.8 consumer and 0.9 consumer APIs, but not the other way around.

This output supports connecting to Kafka over:

  • SSL (requires plugin version 3.0.0 or later)

  • Kerberos SASL (requires plugin version 5.1.0 or later)

By default security is disabled but can be turned on as needed.

The only required configuration is the topic_id. The default codec is plain, so events will be persisted on the broker in plain format. Logstash will encode your messages with not only the message but also with a timestamp and hostname. If you do not want anything but your message passing through, you should make the output configuration something like:

source,ruby

output

kafka {
  codec => plain {
     format => "%{message"
  }
  topic_id => "mytopic"
}

}

For more information see kafka.apache.org/documentation.html#theproducer

Kafka producer configuration: kafka.apache.org/documentation.html#newproducerconfigs

Instance Method Summary collapse

Instance Method Details

#closeObject



204
205
206
# File 'lib/logstash/outputs/kafka.rb', line 204

def close
  @producer.close
end

#receive(event) ⇒ Object

def register



197
198
199
200
201
202
# File 'lib/logstash/outputs/kafka.rb', line 197

def receive(event)
  if event == LogStash::SHUTDOWN
    return
  end
  @codec.encode(event)
end

#registerObject



177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
# File 'lib/logstash/outputs/kafka.rb', line 177

def register
  @producer = create_producer
  @codec.on_event do |event, data|
    begin
      if @message_key.nil?
        record = org.apache.kafka.clients.producer.ProducerRecord.new(event.sprintf(@topic_id), data)
      else
        record = org.apache.kafka.clients.producer.ProducerRecord.new(event.sprintf(@topic_id), event.sprintf(@message_key), data)
      end
      @producer.send(record)
    rescue LogStash::ShutdownSignal
      @logger.debug('Kafka producer got shutdown signal')
    rescue => e
      @logger.warn('kafka producer threw exception, restarting',
                   :exception => e)
    end
  end

end