Class: EventMachine::Kafka::Consumer

Inherits:
Object
  • Object
show all
Defined in:
lib/em-kafka/consumer.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(uri, options = {}) ⇒ Consumer

Returns a new instance of Consumer.



17
18
19
20
21
22
23
24
25
26
27
28
29
30
# File 'lib/em-kafka/consumer.rb', line 17

def initialize(uri, options = {})
  uri = URI(uri)
  self.host = uri.host
  self.port = uri.port
  self.topic = uri.user
  self.partition = uri.path[1..-1].to_i

  self.offset       = options[:offset]       || 0
  self.max_size     = options[:max_size]     || EM::Kafka::MESSAGE_MAX_SIZE
  self.request_type = options[:request_type] || EM::Kafka::REQUEST_FETCH
  self.polling      = options[:polling]      || EM::Kafka::CONSUMER_POLLING_INTERVAL
  self.client = EM::Kafka::Client.new(host, port)
  client.connect
end

Instance Attribute Details

#clientObject

Returns the value of attribute client.



7
8
9
# File 'lib/em-kafka/consumer.rb', line 7

def client
  @client
end

#hostObject

Returns the value of attribute host.



7
8
9
# File 'lib/em-kafka/consumer.rb', line 7

def host
  @host
end

#max_sizeObject

Returns the value of attribute max_size.



7
8
9
# File 'lib/em-kafka/consumer.rb', line 7

def max_size
  @max_size
end

#offsetObject

Returns the value of attribute offset.



7
8
9
# File 'lib/em-kafka/consumer.rb', line 7

def offset
  @offset
end

#partitionObject

Returns the value of attribute partition.



7
8
9
# File 'lib/em-kafka/consumer.rb', line 7

def partition
  @partition
end

#pollingObject

Returns the value of attribute polling.



7
8
9
# File 'lib/em-kafka/consumer.rb', line 7

def polling
  @polling
end

#portObject

Returns the value of attribute port.



7
8
9
# File 'lib/em-kafka/consumer.rb', line 7

def port
  @port
end

#request_typeObject

Returns the value of attribute request_type.



7
8
9
# File 'lib/em-kafka/consumer.rb', line 7

def request_type
  @request_type
end

#topicObject

Returns the value of attribute topic.



7
8
9
# File 'lib/em-kafka/consumer.rb', line 7

def topic
  @topic
end

Instance Method Details

#consume(&block) ⇒ Object

Raises:

  • (ArgumentError)


32
33
34
35
36
37
38
# File 'lib/em-kafka/consumer.rb', line 32

def consume(&block)
  raise ArgumentError.new("block required") unless block_given?
  parser = EM::Kafka::Parser.new(offset, &block)
  parser.on_offset_update { |i| self.offset = i }
  client.on_data { |binary| parser.on_data(binary) }
  EM.add_periodic_timer(polling) { request_consume }
end