Class: Hermann::Consumer
- Inherits:
-
Object
- Object
- Hermann::Consumer
- Defined in:
- lib/hermann/consumer.rb
Overview
Hermann::Consumer provides a simple consumer API which is only safe to be executed in a single thread
Instance Attribute Summary collapse
-
#internal ⇒ Object
readonly
Returns the value of attribute internal.
-
#topic ⇒ Object
readonly
Returns the value of attribute topic.
Instance Method Summary collapse
-
#consume(topic = nil, &block) ⇒ Object
Delegates the consume method to internal consumer classes.
-
#initialize(topic, opts = {}) ⇒ Consumer
constructor
Instantiate Consumer.
- #require_values_at(opts, *args) ⇒ Object
-
#shutdown ⇒ Object
Delegates the shutdown of kafka messages threads to internal consumer classes.
Constructor Details
#initialize(topic, opts = {}) ⇒ Consumer
Instantiate Consumer
25 26 27 28 29 30 31 32 33 34 35 36 |
# File 'lib/hermann/consumer.rb', line 25 def initialize(topic, opts = {}) @topic = topic if Hermann.jruby? zookeepers, group_id = require_values_at(opts, :zookeepers, :group_id) @internal = Hermann::Provider::JavaSimpleConsumer.new(zookeepers, group_id, topic, opts) else brokers, partition = require_values_at(opts, :brokers, :partition) @internal = Hermann::Lib::Consumer.new(topic, brokers, partition) end end |
Instance Attribute Details
#internal ⇒ Object (readonly)
Returns the value of attribute internal.
13 14 15 |
# File 'lib/hermann/consumer.rb', line 13 def internal @internal end |
#topic ⇒ Object (readonly)
Returns the value of attribute topic.
13 14 15 |
# File 'lib/hermann/consumer.rb', line 13 def topic @topic end |
Instance Method Details
#consume(topic = nil, &block) ⇒ Object
Delegates the consume method to internal consumer classes
39 40 41 |
# File 'lib/hermann/consumer.rb', line 39 def consume(topic=nil, &block) @internal.consume(topic, &block) end |
#require_values_at(opts, *args) ⇒ Object
52 53 54 55 56 57 |
# File 'lib/hermann/consumer.rb', line 52 def require_values_at(opts, *args) args.map do |a| raise "Please provide :#{a} option!" unless opts[a] opts.delete(a) end end |