Class: SimpleKafkaConsumer::Consumer

Inherits:
Object
  • Object
show all
Defined in:
lib/simple_kafka_consumer/consumer.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(kafka_servers, zookeeper_servers, options = {}) ⇒ Consumer

Returns a new instance of Consumer.



6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# File 'lib/simple_kafka_consumer/consumer.rb', line 6

def initialize(kafka_servers, zookeeper_servers, options = {})
  @logger = options.delete(:logger)
  timeout_length = options.delete(:timeout) || 5
  @consumer = Poseidon::ConsumerGroup.new(
    group_name,
    kafka_servers,
    zookeeper_servers,
    topic_name,
    options
  )
  %w(INT TERM).each do |signal|
    Signal.trap(signal) do
      @terminated = true
      @timeout = timeout_length
    end
  end
end

Instance Attribute Details

#consumerObject (readonly)

Returns the value of attribute consumer.



5
6
7
# File 'lib/simple_kafka_consumer/consumer.rb', line 5

def consumer
  @consumer
end

#loggerObject (readonly)

Returns the value of attribute logger.



5
6
7
# File 'lib/simple_kafka_consumer/consumer.rb', line 5

def logger
  @logger
end

Instance Method Details

#runObject



24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/simple_kafka_consumer/consumer.rb', line 24

def run
  debug "partitions: #{consumer.partitions}"
  debug "claimed: #{consumer.claimed}"
  consumer.fetch_loop do |partition, bulk|
    Timeout.timeout(@timeout) do
      bulk.each do |message|
        process(parse(message))
      end
    end
    break if @terminated
  end
rescue ZK::Exceptions::OperationTimeOut => e
  log e.message
  retry
end