Class: LogStash::Inputs::Kafka

Inherits:
Base
  • Object
show all
Includes:
PluginMixins::DeprecationLoggerSupport, PluginMixins::KafkaAvroSchemaRegistry, PluginMixins::KafkaSupport
Defined in:
lib/logstash/inputs/kafka.rb

Overview

This input will read events from a Kafka topic. It uses the 0.10 version of the consumer API provided by Kafka to read messages from the broker.

Here’s a compatibility matrix that shows the Kafka client versions that are compatible with each combination of Logstash and the Kafka input plugin:

options=“header”

|========================================================== |Kafka Client Version |Logstash Version |Plugin Version |Why? |0.8 |2.0.0 - 2.x.x |<3.0.0 |Legacy, 0.8 is still popular |0.9 |2.0.0 - 2.3.x | 3.x.x |Works with the old Ruby Event API (‘event[’price’] = 10`)

|0.9 |2.4.x - 5.x.x | 4.x.x |Works with the new getter/setter APIs (‘event.set(’[price]‘, 10)`) |0.10.0.x |2.4.x - 5.x.x | 5.x.x |Not compatible with the <= 0.9 broker |0.10.1.x |2.4.x - 5.x.x | 6.x.x | |==========================================================

NOTE: We recommended that you use matching Kafka client and broker versions. During upgrades, you should upgrade brokers before clients because brokers target backwards compatibility. For example, the 0.9 broker is compatible with both the 0.8 consumer and 0.9 consumer APIs, but not the other way around.

This input supports connecting to Kafka over:

  • SSL (requires plugin version 3.0.0 or later)

  • Kerberos SASL (requires plugin version 5.1.0 or later)

By default security is disabled but can be turned on as needed.

The Logstash Kafka consumer handles group management and uses the default offset management strategy using Kafka topics.

Logstash instances by default form a single logical group to subscribe to Kafka topics Each Logstash Kafka consumer can run multiple threads to increase read throughput. Alternatively, you could run multiple Logstash instances with the same ‘group_id` to spread the load across physical machines. Messages in a topic will be distributed to all Logstash instances with the same `group_id`.

Ideally you should have as many threads as the number of partitions for a perfect balance – more threads than partitions means that some threads will be idle

For more information see kafka.apache.org/documentation.html#theconsumer

Kafka consumer configuration: kafka.apache.org/documentation.html#consumerconfigs

Constant Summary collapse

DEFAULT_DESERIALIZER_CLASS =
"org.apache.kafka.common.serialization.StringDeserializer"
METADATA_NONE =
Set[].freeze
METADATA_BASIC =
Set[:record_props].freeze
METADATA_EXTENDED =
Set[:record_props, :headers].freeze
METADATA_DEPRECATION_MAP =
{ 'true' => 'basic', 'false' => 'none' }

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from PluginMixins::KafkaAvroSchemaRegistry

#check_schema_registry_parameters, included, #setup_schema_registry_config

Methods included from PluginMixins::KafkaSupport

#set_sasl_config, #set_trustore_keystore_config

Instance Attribute Details

#metadata_modeObject (readonly)

Returns the value of attribute metadata_mode.



250
251
252
# File 'lib/logstash/inputs/kafka.rb', line 250

def 
  @metadata_mode
end

Instance Method Details

#kafka_consumersObject



295
296
297
# File 'lib/logstash/inputs/kafka.rb', line 295

def kafka_consumers
  @runner_consumers
end

#registerObject



253
254
255
256
257
# File 'lib/logstash/inputs/kafka.rb', line 253

def register
  @runner_threads = []
  @metadata_mode = (@decorate_events)
  check_schema_registry_parameters
end

#run(logstash_queue) ⇒ Object



282
283
284
285
286
# File 'lib/logstash/inputs/kafka.rb', line 282

def run(logstash_queue)
  @runner_consumers = consumer_threads.times.map { |i| create_consumer("#{client_id}-#{i}") }
  @runner_threads = @runner_consumers.map { |consumer| thread_runner(logstash_queue, consumer) }
  @runner_threads.each { |t| t.join }
end

#stopObject



289
290
291
292
# File 'lib/logstash/inputs/kafka.rb', line 289

def stop
  # if we have consumers, wake them up to unblock our runner threads
  @runner_consumers && @runner_consumers.each(&:wakeup)
end