Class: Hermann::Provider::JavaSimpleConsumer

Inherits:
Object
  • Object
show all
Defined in:
lib/hermann/provider/java_simple_consumer.rb

Overview

Implements a java based consumer. The #consumer method loops infinitely, the hasNext() method blocks until a message is available.

Constant Summary collapse

NUM_THREADS =
1
DEFAULTS =

default zookeeper connection options

{
 'zookeeper.session.timeout.ms' => '400',
 'zookeeper.sync.time.ms'       => '200',
 'auto.commit.interval.ms'      => '1000'
}.freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(zookeepers, groupId, topic, opts = {}) ⇒ JavaSimpleConsumer

Instantiate JavaSimpleConsumer

Parameters:

  • opts (Hash) (defaults to: {})

    a customizable set of options

Options Hash (opts):

  • :sleep_time (Fixnum)

    Time to sleep between consume retries, defaults to 1sec

  • :do_retry (Boolean)

    Retry consume attempts if exceptions are thrown, defaults to true



32
33
34
35
36
37
38
# File 'lib/hermann/provider/java_simple_consumer.rb', line 32

def initialize(zookeepers, groupId, topic, opts={})
  config       = create_config(zookeepers, groupId)
  @consumer    = ConsumerUtil::Consumer.createJavaConsumerConnector(config)
  @topic       = topic
  @sleep_time  = opts.delete(:sleep_time) || 1
  @do_retry    = opts.delete(:do_retry)   || true
end

Instance Attribute Details

#consumerObject

Returns the value of attribute consumer.



10
11
12
# File 'lib/hermann/provider/java_simple_consumer.rb', line 10

def consumer
  @consumer
end

#topicObject

Returns the value of attribute topic.



10
11
12
# File 'lib/hermann/provider/java_simple_consumer.rb', line 10

def topic
  @topic
end

#zookeeperObject

Returns the value of attribute zookeeper.



10
11
12
# File 'lib/hermann/provider/java_simple_consumer.rb', line 10

def zookeeper
  @zookeeper
end

Instance Method Details

#consume(topic = nil) ⇒ Object

Starts infinite loop to consume messages. hasNext() blocks until a message is available at which point it is yielded to the block

Examples

consumer.consume do |message|

puts "Received: #{message}"

end



58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/hermann/provider/java_simple_consumer.rb', line 58

def consume(topic=nil)
  begin
    stream = get_stream(topic)
    it = stream.iterator
    while it.hasNext do
      yield it.next.message.to_s
    end
  rescue Exception => e
    puts "#{self.class.name}#consume exception: #{e.class.name}"
    puts "Msg: #{e.message}"
    puts e.backtrace.join("\n")
    if retry?
      sleep @sleep_time
      retry
    else
      raise e
    end
  end
end

#shutdownObject

Shuts down the various threads created by createMessageStreams This can be called after the thread executing consume has exited to clean up.



43
44
45
# File 'lib/hermann/provider/java_simple_consumer.rb', line 43

def shutdown
  @consumer.shutdown
end