Class: Kafka::Producer

Inherits:
Object
  • Object
show all
Includes:
IO
Defined in:
lib/kafka/producer.rb

Constant Summary

Constants included from IO

IO::HOST, IO::PORT

Instance Attribute Summary collapse

Attributes included from IO

#host, #port, #socket

Instance Method Summary collapse

Methods included from IO

#connect, #disconnect, #read, #reconnect, #write

Constructor Details

#initialize(options = {}) ⇒ Producer

Returns a new instance of Producer.



22
23
24
25
26
27
28
# File 'lib/kafka/producer.rb', line 22

def initialize(options = {})
  self.topic     = options[:topic]      || "test"
  self.partition = options[:partition]  || 0
  self.host      = options[:host]       || HOST
  self.port      = options[:port]       || PORT
  self.connect(self.host, self.port)
end

Instance Attribute Details

#partitionObject

Returns the value of attribute partition.



20
21
22
# File 'lib/kafka/producer.rb', line 20

def partition
  @partition
end

#topicObject

Returns the value of attribute topic.



20
21
22
# File 'lib/kafka/producer.rb', line 20

def topic
  @topic
end

Instance Method Details

#batch(&block) ⇒ Object



34
35
36
37
38
39
# File 'lib/kafka/producer.rb', line 34

def batch(&block)
  batch = Kafka::Batch.new
  block.call( batch )
  self.send(batch.messages)
  batch.messages.clear
end

#send(messages) ⇒ Object



30
31
32
# File 'lib/kafka/producer.rb', line 30

def send(messages)
  self.write(Encoder.produce(self.topic, self.partition, messages))
end