Class: LogStash::Inputs::Kafka
- Inherits:
-
Base
- Object
- Base
- LogStash::Inputs::Kafka
- Includes:
- PluginMixins::DeprecationLoggerSupport, PluginMixins::Kafka::AvroSchemaRegistry, PluginMixins::Kafka::Common
- Defined in:
- lib/logstash/inputs/kafka.rb
Overview
This input will read events from a Kafka topic. It uses the 0.10 version of the consumer API provided by Kafka to read messages from the broker.
Here’s a compatibility matrix that shows the Kafka client versions that are compatible with each combination of Logstash and the Kafka input plugin:
- options=“header”
-
|========================================================== |Kafka Client Version |Logstash Version |Plugin Version |Why? |0.8 |2.0.0 - 2.x.x |<3.0.0 |Legacy, 0.8 is still popular |0.9 |2.0.0 - 2.3.x | 3.x.x |Works with the old Ruby Event API (‘event[’price’] = 10`)
|0.9 |2.4.x - 5.x.x | 4.x.x |Works with the new getter/setter APIs (‘event.set(’[price]‘, 10)`) |0.10.0.x |2.4.x - 5.x.x | 5.x.x |Not compatible with the <= 0.9 broker |0.10.1.x |2.4.x - 5.x.x | 6.x.x | |==========================================================
NOTE: We recommended that you use matching Kafka client and broker versions. During upgrades, you should upgrade brokers before clients because brokers target backwards compatibility. For example, the 0.9 broker is compatible with both the 0.8 consumer and 0.9 consumer APIs, but not the other way around.
This input supports connecting to Kafka over:
-
SSL (requires plugin version 3.0.0 or later)
-
Kerberos SASL (requires plugin version 5.1.0 or later)
By default security is disabled but can be turned on as needed.
The Logstash Kafka consumer handles group management and uses the default offset management strategy using Kafka topics.
Logstash instances by default form a single logical group to subscribe to Kafka topics Each Logstash Kafka consumer can run multiple threads to increase read throughput. Alternatively, you could run multiple Logstash instances with the same ‘group_id` to spread the load across physical machines. Messages in a topic will be distributed to all Logstash instances with the same `group_id`.
Ideally you should have as many threads as the number of partitions for a perfect balance – more threads than partitions means that some threads will be idle
For more information see kafka.apache.org/documentation.html#theconsumer
Kafka consumer configuration: kafka.apache.org/documentation.html#consumerconfigs
-
Constant Summary collapse
- DEFAULT_DESERIALIZER_CLASS =
"org.apache.kafka.common.serialization.StringDeserializer"
- METADATA_NONE =
Set[].freeze
- METADATA_BASIC =
Set[:record_props].freeze
- METADATA_EXTENDED =
Set[:record_props, :headers].freeze
- METADATA_DEPRECATION_MAP =
{ 'true' => 'basic', 'false' => 'none' }
Instance Attribute Summary collapse
-
#metadata_mode ⇒ Object
readonly
Returns the value of attribute metadata_mode.
Instance Method Summary collapse
- #do_poll(consumer) ⇒ Object
- #handle_record(record, codec_instance, queue) ⇒ Object
-
#based(onschemaregistrychangethecodecdefault) ⇒ Kafka
constructor
A new instance of Kafka.
- #kafka_consumers ⇒ Object
- #maybe_commit_offset(consumer) ⇒ Object
- #maybe_set_metadata(event, record) ⇒ Object
- #register ⇒ Object
- #run(logstash_queue) ⇒ Object
- #stop ⇒ Object
- #subscribe(consumer) ⇒ Object
- #thread_runner(logstash_queue, consumer, name) ⇒ Object
Methods included from PluginMixins::Kafka::AvroSchemaRegistry
#check_schema_registry_parameters, included, #schema_registry_validation?, #setup_schema_registry_config, #using_kerberos?
Methods included from PluginMixins::Kafka::Common
included, #reassign_dns_lookup, #set_sasl_config, #set_trustore_keystore_config
Constructor Details
#based(onschemaregistrychangethecodecdefault) ⇒ Kafka
Returns a new instance of Kafka.
278 279 280 281 282 283 284 |
# File 'lib/logstash/inputs/kafka.rb', line 278 def initialize(params = {}) unless params.key?('codec') params['codec'] = params.key?('schema_registry_url') ? 'json' : 'plain' end super(params) end |
Instance Attribute Details
#metadata_mode ⇒ Object (readonly)
Returns the value of attribute metadata_mode.
275 276 277 |
# File 'lib/logstash/inputs/kafka.rb', line 275 def @metadata_mode end |
Instance Method Details
#do_poll(consumer) ⇒ Object
363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 |
# File 'lib/logstash/inputs/kafka.rb', line 363 def do_poll(consumer) records = [] begin records = consumer.poll(java.time.Duration.ofMillis(poll_timeout_ms)) rescue org.apache.kafka.common.errors.WakeupException => e logger.debug("Wake up from poll", :kafka_error_message => e) raise e unless stop? rescue org.apache.kafka.common.errors.FencedInstanceIdException => e logger.error("Another consumer with same group.instance.id has connected", :original_error_message => e.) raise e unless stop? rescue => e logger.error("Unable to poll Kafka consumer", :kafka_error_message => e, :cause => e.respond_to?(:getCause) ? e.getCause : nil) Stud.stoppable_sleep(1) { stop? } end records end |
#handle_record(record, codec_instance, queue) ⇒ Object
382 383 384 385 386 387 388 389 |
# File 'lib/logstash/inputs/kafka.rb', line 382 def handle_record(record, codec_instance, queue) # use + since .to_s on nil/boolean returns a frozen string since ruby 2.7 codec_instance.decode(+record.value.to_s) do |event| decorate(event) (event, record) queue << event end end |
#kafka_consumers ⇒ Object
336 337 338 |
# File 'lib/logstash/inputs/kafka.rb', line 336 def kafka_consumers @runner_consumers end |
#maybe_commit_offset(consumer) ⇒ Object
413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 |
# File 'lib/logstash/inputs/kafka.rb', line 413 def maybe_commit_offset(consumer) begin consumer.commitSync if @enable_auto_commit.eql?(false) rescue org.apache.kafka.common.errors.WakeupException => e logger.debug("Wake up from commitSync", :kafka_error_message => e) raise e unless stop? rescue StandardError => e # For transient errors, the commit should be successful after the next set of # polled records has been processed. # But, it might also be worth thinking about adding a configurable retry mechanism logger.error("Unable to commit records", :kafka_error_message => e, :cause => e.respond_to?(:getCause) ? e.getCause() : nil) end end |
#maybe_set_metadata(event, record) ⇒ Object
391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 |
# File 'lib/logstash/inputs/kafka.rb', line 391 def (event, record) if @metadata_mode.include?(:record_props) event.set("[@metadata][kafka][topic]", record.topic) event.set("[@metadata][kafka][consumer_group]", @group_id) event.set("[@metadata][kafka][partition]", record.partition) event.set("[@metadata][kafka][offset]", record.offset) event.set("[@metadata][kafka][key]", record.key) event.set("[@metadata][kafka][timestamp]", record.) end if @metadata_mode.include?(:headers) record.headers .select{|h| header_with_value(h) } .each do |header| s = String.from_java_bytes(header.value) s.force_encoding(Encoding::UTF_8) if s.valid_encoding? event.set("[@metadata][kafka][headers][" + header.key + "]", s) end end end end |
#register ⇒ Object
287 288 289 290 291 292 293 |
# File 'lib/logstash/inputs/kafka.rb', line 287 def register @runner_threads = [] @metadata_mode = (@decorate_events) reassign_dns_lookup @pattern ||= java.util.regex.Pattern.compile(@topics_pattern) unless @topics_pattern.nil? check_schema_registry_parameters end |
#run(logstash_queue) ⇒ Object
318 319 320 321 322 323 324 325 326 327 |
# File 'lib/logstash/inputs/kafka.rb', line 318 def run(logstash_queue) @runner_consumers = consumer_threads.times.map do |i| thread_group_instance_id = consumer_threads > 1 && group_instance_id ? "#{group_instance_id}-#{i}" : group_instance_id subscribe(create_consumer("#{client_id}-#{i}", thread_group_instance_id)) end @runner_threads = @runner_consumers.map.with_index { |consumer, i| thread_runner(logstash_queue, consumer, "kafka-input-worker-#{client_id}-#{i}") } @runner_threads.each(&:start) @runner_threads.each(&:join) end |
#stop ⇒ Object
330 331 332 333 |
# File 'lib/logstash/inputs/kafka.rb', line 330 def stop # if we have consumers, wake them up to unblock our runner threads @runner_consumers && @runner_consumers.each(&:wakeup) end |
#subscribe(consumer) ⇒ Object
340 341 342 343 |
# File 'lib/logstash/inputs/kafka.rb', line 340 def subscribe(consumer) @pattern.nil? ? consumer.subscribe(topics) : consumer.subscribe(@pattern) consumer end |
#thread_runner(logstash_queue, consumer, name) ⇒ Object
345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 |
# File 'lib/logstash/inputs/kafka.rb', line 345 def thread_runner(logstash_queue, consumer, name) java.lang.Thread.new do LogStash::Util::set_thread_name(name) begin codec_instance = @codec.clone until stop? records = do_poll(consumer) unless records.empty? records.each { |record| handle_record(record, codec_instance, logstash_queue) } maybe_commit_offset(consumer) end end ensure consumer.close end end end |