Class: Hermann::Provider::JavaSimpleConsumer

Inherits:
Object
  • Object
show all
Defined in:
lib/hermann/provider/java_simple_consumer.rb

Overview

Implements a java based consumer. The #consumer method loops infinitely, the hasNext() method blocks until a message is available.

Constant Summary collapse

NUM_THREADS =
1
DEFAULTS_HERMANN_OPTS =

default zookeeper connection options

{
  'zookeeper.session.timeout.ms' => '400',
  'zookeeper.sync.time.ms'       => '200',
  'auto.commit.interval.ms'      => '1000',
}.freeze
DEFAULT_CONSUMER_OPTIONS =
{
  :do_retry         => true,
  :max_retries      => 3,
  :backoff_time_sec => 1,
  :logger           => nil
}.freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(zookeepers, groupId, topic, opts = {}) ⇒ JavaSimpleConsumer

Instantiate JavaSimpleConsumer

Parameters:

  • opts (Hash) (defaults to: {})

    a customizable set of options

Options Hash (opts):

  • :backoff_time_sec (Fixnum)

    Time to sleep between consume retries, defaults to 1sec

  • :do_retry (Boolean)

    Retry consume attempts if exceptions are thrown, defaults to true

  • :max_retries (Fixnum)

    Number of max_retries to retry #consume when it throws an exception

  • :logger (Logger)

    Pass in a Logger

  • other (Other)

    opts from kafka



42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/hermann/provider/java_simple_consumer.rb', line 42

def initialize(zookeepers, groupId, topic, opts={})
  @topic            = topic
  options           = DEFAULT_CONSUMER_OPTIONS.merge(opts)
  @backoff_time_sec = options.delete(:backoff_time_sec)
  @do_retry         = options.delete(:do_retry)
  @max_retries      = options.delete(:max_retries)
  @logger           = options.delete(:logger)
  # deleting options above so that they do not get sent to
  # the create_config method
  config            = create_config(zookeepers, groupId, options)
  @consumer         = ConsumerUtil::Consumer.createJavaConsumerConnector(config)
end

Instance Attribute Details

#consumerObject

Returns the value of attribute consumer.



10
11
12
# File 'lib/hermann/provider/java_simple_consumer.rb', line 10

def consumer
  @consumer
end

#topicObject

Returns the value of attribute topic.



10
11
12
# File 'lib/hermann/provider/java_simple_consumer.rb', line 10

def topic
  @topic
end

#zookeeperObject

Returns the value of attribute zookeeper.



10
11
12
# File 'lib/hermann/provider/java_simple_consumer.rb', line 10

def zookeeper
  @zookeeper
end

Instance Method Details

#consume(topic = nil) ⇒ Object

Starts infinite loop to consume messages. hasNext() blocks until a message is available at which point it is yielded to the block

Examples

consumer.consume do |message|

puts "Received: #{message}"

end



73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# File 'lib/hermann/provider/java_simple_consumer.rb', line 73

def consume(topic=nil)
  begin
    stream = get_stream(topic)
    it = stream.iterator
    while it.hasNext do
      message = it.next.message
      yield String.from_java_bytes(message)
    end
  rescue => e
    if retry? && @max_retries > 0
      sleep @backoff_time_sec
      @max_retries -= 1
      retry
    else
      log_exception(e)
      raise e
    end
  end
end

#shutdownObject

Shuts down the various threads created by createMessageStreams This can be called after the thread executing consume has exited to clean up.



58
59
60
# File 'lib/hermann/provider/java_simple_consumer.rb', line 58

def shutdown
  @consumer.shutdown
end