Class: Heller::ZookeeperConsumer

Inherits:
Object
  • Object
show all
Defined in:
lib/heller/zookeeper_consumer.rb

Instance Method Summary collapse

Constructor Details

#initialize(zk_hosts, options, consumer_impl = Kafka::Consumer::Consumer) ⇒ ZookeeperConsumer

Returns a new instance of ZookeeperConsumer.



5
6
7
# File 'lib/heller/zookeeper_consumer.rb', line 5

def initialize(zk_hosts, options, consumer_impl=Kafka::Consumer::Consumer)
  @consumer = create_consumer(consumer_impl, options.merge(zk_connect: zk_hosts))
end

Instance Method Details

#closeObject Also known as: shutdown



30
31
32
# File 'lib/heller/zookeeper_consumer.rb', line 30

def close
  @consumer.shutdown
end

#commitObject



26
27
28
# File 'lib/heller/zookeeper_consumer.rb', line 26

def commit
  @consumer.commit_offsets
end

#create_streams(topic_count_map, options = {}) ⇒ Object



9
10
11
12
13
14
15
# File 'lib/heller/zookeeper_consumer.rb', line 9

def create_streams(topic_count_map, options={})
  if options[:key_decoder] && options[:value_decoder]
    @consumer.create_message_streams(convert_longs(topic_count_map), *options.values_at(:key_decoder, :value_decoder))
  else
    @consumer.create_message_streams(convert_longs(topic_count_map))
  end
end

#create_streams_by_filter(filter, num_streams, options = {}) ⇒ Object



17
18
19
20
21
22
23
24
# File 'lib/heller/zookeeper_consumer.rb', line 17

def create_streams_by_filter(filter, num_streams, options={})
  whitelist = Kafka::Consumer::Whitelist.new(filter)
  if options[:key_decoder] && options[:value_decoder]
    @consumer.create_message_streams_by_filter(whitelist, num_streams, *options.values_at(:key_decoder, :value_decoder)).to_a
  else
    @consumer.create_message_streams_by_filter(whitelist, num_streams).to_a
  end
end