Class: EventMachine::Kafka::Consumer
- Inherits:
-
Object
- Object
- EventMachine::Kafka::Consumer
- Defined in:
- lib/em-kafka/consumer.rb
Instance Attribute Summary collapse
-
#client ⇒ Object
Returns the value of attribute client.
-
#host ⇒ Object
Returns the value of attribute host.
-
#max_size ⇒ Object
Returns the value of attribute max_size.
-
#offset ⇒ Object
Returns the value of attribute offset.
-
#partition ⇒ Object
Returns the value of attribute partition.
-
#polling ⇒ Object
Returns the value of attribute polling.
-
#port ⇒ Object
Returns the value of attribute port.
-
#request_type ⇒ Object
Returns the value of attribute request_type.
-
#topic ⇒ Object
Returns the value of attribute topic.
Instance Method Summary collapse
- #consume(&block) ⇒ Object
-
#initialize(uri, options = {}) ⇒ Consumer
constructor
A new instance of Consumer.
Constructor Details
#initialize(uri, options = {}) ⇒ Consumer
Returns a new instance of Consumer.
17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
# File 'lib/em-kafka/consumer.rb', line 17 def initialize(uri, = {}) uri = URI(uri) self.host = uri.host self.port = uri.port self.topic = uri.user self.partition = uri.path[1..-1].to_i self.offset = [:offset] || 0 self.max_size = [:max_size] || EM::Kafka::MESSAGE_MAX_SIZE self.request_type = [:request_type] || EM::Kafka::REQUEST_FETCH self.polling = [:polling] || EM::Kafka::CONSUMER_POLLING_INTERVAL self.client = EM::Kafka::Client.new(host, port) client.connect end |
Instance Attribute Details
#client ⇒ Object
Returns the value of attribute client.
7 8 9 |
# File 'lib/em-kafka/consumer.rb', line 7 def client @client end |
#host ⇒ Object
Returns the value of attribute host.
7 8 9 |
# File 'lib/em-kafka/consumer.rb', line 7 def host @host end |
#max_size ⇒ Object
Returns the value of attribute max_size.
7 8 9 |
# File 'lib/em-kafka/consumer.rb', line 7 def max_size @max_size end |
#offset ⇒ Object
Returns the value of attribute offset.
7 8 9 |
# File 'lib/em-kafka/consumer.rb', line 7 def offset @offset end |
#partition ⇒ Object
Returns the value of attribute partition.
7 8 9 |
# File 'lib/em-kafka/consumer.rb', line 7 def partition @partition end |
#polling ⇒ Object
Returns the value of attribute polling.
7 8 9 |
# File 'lib/em-kafka/consumer.rb', line 7 def polling @polling end |
#port ⇒ Object
Returns the value of attribute port.
7 8 9 |
# File 'lib/em-kafka/consumer.rb', line 7 def port @port end |
#request_type ⇒ Object
Returns the value of attribute request_type.
7 8 9 |
# File 'lib/em-kafka/consumer.rb', line 7 def request_type @request_type end |
#topic ⇒ Object
Returns the value of attribute topic.
7 8 9 |
# File 'lib/em-kafka/consumer.rb', line 7 def topic @topic end |
Instance Method Details
#consume(&block) ⇒ Object
32 33 34 35 36 37 38 |
# File 'lib/em-kafka/consumer.rb', line 32 def consume(&block) raise ArgumentError.new("block required") unless block_given? parser = EM::Kafka::Parser.new(offset, &block) parser.on_offset_update { |i| self.offset = i } client.on_data { |binary| parser.on_data(binary) } EM.add_periodic_timer(polling) { request_consume } end |