Class: LogStash::Inputs::Kafka

Inherits:
Base
  • Object
show all
Defined in:
lib/logstash/inputs/kafka.rb

Overview

This input will read events from a Kafka topic. It uses the high level consumer API provided by Kafka to read messages from the broker. It also maintains the state of what has been consumed using Zookeeper. The default input codec is json

You must configure ‘topic_id`, `white_list` or `black_list`. By default it will connect to a Zookeeper running on localhost. All the broker information is read from Zookeeper state

Ideally you should have as many threads as the number of partitions for a perfect balance – more threads than partitions means that some threads will be idle

For more information see kafka.apache.org/documentation.html#theconsumer

Kafka consumer configuration: kafka.apache.org/documentation.html#consumerconfigs

Instance Method Summary collapse

Instance Method Details

#registerObject



97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
# File 'lib/logstash/inputs/kafka.rb', line 97

def register
  LogStash::Logger.setup_log4j(@logger)
  options = {
      :zk_connect => @zk_connect,
      :group_id => @group_id,
      :topic_id => @topic_id,
      :auto_offset_reset => @auto_offset_reset,
      :rebalance_max_retries => @rebalance_max_retries,
      :rebalance_backoff_ms => @rebalance_backoff_ms,
      :consumer_timeout_ms => @consumer_timeout_ms,
      :consumer_restart_on_error => @consumer_restart_on_error,
      :consumer_restart_sleep_ms => @consumer_restart_sleep_ms,
      :consumer_id => @consumer_id,
      :fetch_message_max_bytes => @fetch_message_max_bytes,
      :allow_topics => @white_list,
      :filter_topics => @black_list,
      :value_decoder_class => @decoder_class,
      :key_decoder_class => @key_decoder_class
  }
  if @reset_beginning
    options[:reset_beginning] = 'from-beginning'
  end # if :reset_beginning
  topic_or_filter = [@topic_id, @white_list, @black_list].compact
  if topic_or_filter.count == 0
    raise LogStash::ConfigurationError, 'topic_id, white_list or black_list required.'
  elsif topic_or_filter.count > 1
    raise LogStash::ConfigurationError, 'Invalid combination of topic_id, white_list or black_list. Use only one.'
  end
  @kafka_client_queue = SizedQueue.new(@queue_size)
  @consumer_group = create_consumer_group(options)
  @logger.info('Registering kafka', :group_id => @group_id, :topic_id => @topic_id, :zk_connect => @zk_connect)
end

#run(logstash_queue) ⇒ Object



131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
# File 'lib/logstash/inputs/kafka.rb', line 131

def run(logstash_queue)
  # noinspection JRubyStringImportInspection
  java_import 'kafka.common.ConsumerRebalanceFailedException'
  @logger.info('Running kafka', :group_id => @group_id, :topic_id => @topic_id, :zk_connect => @zk_connect)
  begin
    @consumer_group.run(@consumer_threads,@kafka_client_queue)

    while !stop?
      if !@kafka_client_queue.empty?
        event = @kafka_client_queue.pop
        queue_event(event, logstash_queue)
      end
    end

    until @kafka_client_queue.empty?
      queue_event(@kafka_client_queue.pop,logstash_queue)
    end

    @logger.info('Done running kafka input')
  rescue => e
    @logger.warn('kafka client threw exception, restarting',
                 :exception => e)
    Stud.stoppable_sleep(Float(@consumer_restart_sleep_ms) * 1 / 1000) { stop? }
    retry if !stop?
  end
end

#stopObject



159
160
161
# File 'lib/logstash/inputs/kafka.rb', line 159

def stop
  @consumer_group.shutdown if @consumer_group.running?
end