Class: Hermann::Provider::JavaSimpleConsumer
- Inherits:
-
Object
- Object
- Hermann::Provider::JavaSimpleConsumer
- Defined in:
- lib/hermann/provider/java_simple_consumer.rb
Overview
Implements a java based consumer. The #consumer method loops infinitely, the hasNext() method blocks until a message is available.
Constant Summary collapse
- NUM_THREADS =
1- DEFAULTS =
default zookeeper connection options
{ 'zookeeper.session.timeout.ms' => '400', 'zookeeper.sync.time.ms' => '200', 'auto.commit.interval.ms' => '1000' }.freeze
Instance Attribute Summary collapse
-
#consumer ⇒ Object
Returns the value of attribute consumer.
-
#topic ⇒ Object
Returns the value of attribute topic.
-
#zookeeper ⇒ Object
Returns the value of attribute zookeeper.
Instance Method Summary collapse
-
#consume(topic = nil) ⇒ Object
Starts infinite loop to consume messages.
-
#initialize(zookeepers, groupId, topic, opts = {}) ⇒ JavaSimpleConsumer
constructor
Instantiate JavaSimpleConsumer.
-
#shutdown ⇒ Object
Shuts down the various threads created by createMessageStreams This can be called after the thread executing consume has exited to clean up.
Constructor Details
#initialize(zookeepers, groupId, topic, opts = {}) ⇒ JavaSimpleConsumer
Instantiate JavaSimpleConsumer
32 33 34 35 36 37 38 |
# File 'lib/hermann/provider/java_simple_consumer.rb', line 32 def initialize(zookeepers, groupId, topic, opts={}) config = create_config(zookeepers, groupId) @consumer = ConsumerUtil::Consumer.createJavaConsumerConnector(config) @topic = topic @sleep_time = opts.delete(:sleep_time) || 1 @do_retry = opts.delete(:do_retry) || true end |
Instance Attribute Details
#consumer ⇒ Object
Returns the value of attribute consumer.
10 11 12 |
# File 'lib/hermann/provider/java_simple_consumer.rb', line 10 def consumer @consumer end |
#topic ⇒ Object
Returns the value of attribute topic.
10 11 12 |
# File 'lib/hermann/provider/java_simple_consumer.rb', line 10 def topic @topic end |
#zookeeper ⇒ Object
Returns the value of attribute zookeeper.
10 11 12 |
# File 'lib/hermann/provider/java_simple_consumer.rb', line 10 def zookeeper @zookeeper end |
Instance Method Details
#consume(topic = nil) ⇒ Object
Starts infinite loop to consume messages. hasNext() blocks until a message is available at which point it is yielded to the block
Examples
consumer.consume do |message|
puts "Received: #{}"
end
58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/hermann/provider/java_simple_consumer.rb', line 58 def consume(topic=nil) begin stream = get_stream(topic) it = stream.iterator while it.hasNext do yield it.next..to_s end rescue Exception => e puts "#{self.class.name}#consume exception: #{e.class.name}" puts "Msg: #{e.}" puts e.backtrace.join("\n") if retry? sleep @sleep_time retry else raise e end end end |
#shutdown ⇒ Object
Shuts down the various threads created by createMessageStreams This can be called after the thread executing consume has exited to clean up.
43 44 45 |
# File 'lib/hermann/provider/java_simple_consumer.rb', line 43 def shutdown @consumer.shutdown end |