Class: Hermann::Consumer

Inherits:
Object
  • Object
show all
Defined in:
lib/hermann/consumer.rb

Overview

Hermann::Consumer provides a simple consumer API which is only safe to be executed in a single thread

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(topic, opts = {}) ⇒ Consumer

Instantiate Consumer

Parameters:

  • opts (Hash) (defaults to: {})

    a customizable set of options

Options Hash (opts):

  • :brokers (String) — default: for MRI

    Comma separated list of brokers

  • :partition (Integer) — default: for MRI

    The kafka partition

  • :zookeepers (Integer) — default: for jruby

    list of zookeeper servers

  • :group_id (Integer) — default: for jruby

    client group_id



25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/hermann/consumer.rb', line 25

def initialize(topic, opts = {})
  @topic = topic
  if Hermann.jruby?
    zookeepers, group_id = require_values_at(opts, :zookeepers, :group_id)

    @internal = Hermann::Provider::JavaSimpleConsumer.new(zookeepers, group_id, topic, opts)
  else
    brokers, partition = require_values_at(opts, :brokers, :partition)

    @internal = Hermann::Lib::Consumer.new(topic, brokers, partition)
  end
end

Instance Attribute Details

#internalObject (readonly)

Returns the value of attribute internal.



13
14
15
# File 'lib/hermann/consumer.rb', line 13

def internal
  @internal
end

#topicObject (readonly)

Returns the value of attribute topic.



13
14
15
# File 'lib/hermann/consumer.rb', line 13

def topic
  @topic
end

Instance Method Details

#consume(topic = nil, &block) ⇒ Object

Delegates the consume method to internal consumer classes



39
40
41
# File 'lib/hermann/consumer.rb', line 39

def consume(topic=nil, &block)
  @internal.consume(topic, &block)
end

#require_values_at(opts, *args) ⇒ Object



52
53
54
55
56
57
# File 'lib/hermann/consumer.rb', line 52

def require_values_at(opts, *args)
  args.map do |a|
    raise "Please provide :#{a} option!" unless opts[a]
    opts.delete(a)
  end
end

#shutdownObject

Delegates the shutdown of kafka messages threads to internal consumer classes



44
45
46
47
48
49
50
# File 'lib/hermann/consumer.rb', line 44

def shutdown
  if Hermann.jruby?
    @internal.shutdown
  else
    #no op
  end
end