Class: Heller::ZookeeperConsumer
- Inherits:
-
Object
- Object
- Heller::ZookeeperConsumer
- Defined in:
- lib/heller/zookeeper_consumer.rb
Instance Method Summary collapse
- #close ⇒ Object (also: #shutdown)
- #commit ⇒ Object
- #create_streams(topic_count_map, options = {}) ⇒ Object
- #create_streams_by_filter(filter, num_streams, options = {}) ⇒ Object
-
#initialize(zk_hosts, options, consumer_impl = Kafka::Consumer::Consumer) ⇒ ZookeeperConsumer
constructor
A new instance of ZookeeperConsumer.
Constructor Details
#initialize(zk_hosts, options, consumer_impl = Kafka::Consumer::Consumer) ⇒ ZookeeperConsumer
Returns a new instance of ZookeeperConsumer.
5 6 7 |
# File 'lib/heller/zookeeper_consumer.rb', line 5 def initialize(zk_hosts, , consumer_impl=Kafka::Consumer::Consumer) @consumer = create_consumer(consumer_impl, .merge(zk_connect: zk_hosts)) end |
Instance Method Details
#close ⇒ Object Also known as: shutdown
30 31 32 |
# File 'lib/heller/zookeeper_consumer.rb', line 30 def close @consumer.shutdown end |
#commit ⇒ Object
26 27 28 |
# File 'lib/heller/zookeeper_consumer.rb', line 26 def commit @consumer.commit_offsets end |
#create_streams(topic_count_map, options = {}) ⇒ Object
9 10 11 12 13 14 15 |
# File 'lib/heller/zookeeper_consumer.rb', line 9 def create_streams(topic_count_map, ={}) if [:key_decoder] && [:value_decoder] @consumer.(convert_longs(topic_count_map), *.values_at(:key_decoder, :value_decoder)) else @consumer.(convert_longs(topic_count_map)) end end |
#create_streams_by_filter(filter, num_streams, options = {}) ⇒ Object
17 18 19 20 21 22 23 24 |
# File 'lib/heller/zookeeper_consumer.rb', line 17 def create_streams_by_filter(filter, num_streams, ={}) whitelist = Kafka::Consumer::Whitelist.new(filter) if [:key_decoder] && [:value_decoder] @consumer.(whitelist, num_streams, *.values_at(:key_decoder, :value_decoder)).to_a else @consumer.(whitelist, num_streams).to_a end end |