Class: Hermann::Consumer

Inherits:
Object
  • Object
show all
Defined in:
lib/hermann/consumer.rb

Overview

Hermann::Consumer provides a simple consumer API which is only safe to be executed in a single thread

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(topic, opts = {}) ⇒ Consumer

Instantiate Consumer

Parameters:

  • opts (Hash) (defaults to: {})

    a customizable set of options

Options Hash (opts):

  • :brokers (String) — default: for MRI

    Comma separated list of brokers

  • :partition (Integer) — default: for MRI

    The kafka partition

  • :offset (Symbol|Fixnum) — default: for MRI

    Starting consumer offset. either :start, :end, or Fixnum

  • :zookeepers (Integer) — default: for jruby

    list of zookeeper servers

  • :group_id (Integer) — default: for jruby

    client group_id

Raises:



27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/hermann/consumer.rb', line 27

def initialize(topic, opts = {})
  @topic = topic

  offset = opts.delete(:offset)
  raise Hermann::Errors::InvalidOffsetError.new("Bad offset: #{offset}") unless valid_offset?(offset)

  if Hermann.jruby?
    zookeepers, group_id = require_values_at(opts, :zookeepers, :group_id)

    @internal = Hermann::Provider::JavaSimpleConsumer.new(zookeepers, group_id, topic, opts)
  else
    brokers, partition = require_values_at(opts, :brokers, :partition)

    @internal = Hermann::Provider::RDKafka::Consumer.new(topic, brokers, partition, offset)
  end
end

Instance Attribute Details

#internalObject (readonly)

Returns the value of attribute internal.



14
15
16
# File 'lib/hermann/consumer.rb', line 14

def internal
  @internal
end

#topicObject (readonly)

Returns the value of attribute topic.



14
15
16
# File 'lib/hermann/consumer.rb', line 14

def topic
  @topic
end

Instance Method Details

#consume(topic = nil, &block) ⇒ Object

Delegates the consume method to internal consumer classes



45
46
47
# File 'lib/hermann/consumer.rb', line 45

def consume(topic=nil, &block)
  @internal.consume(topic, &block)
end

#shutdownObject

Delegates the shutdown of kafka messages threads to internal consumer classes



50
51
52
53
54
55
56
# File 'lib/hermann/consumer.rb', line 50

def shutdown
  if Hermann.jruby?
    @internal.shutdown
  else
    #no op
  end
end