Class: Kafka::Producer

Inherits:
Object
  • Object
show all
Includes:
IO
Defined in:
lib/kafka/producer.rb

Constant Summary

Constants included from IO

IO::HOST, IO::PORT

Instance Attribute Summary collapse

Attributes included from IO

#compression, #host, #port, #socket

Instance Method Summary collapse

Methods included from IO

#connect, #disconnect, #read, #reconnect, #write

Constructor Details

#initialize(options = {}) ⇒ Producer

Returns a new instance of Producer.



22
23
24
25
26
27
28
29
# File 'lib/kafka/producer.rb', line 22

def initialize(options = {})
  self.topic       = options[:topic]      || "test"
  self.partition   = options[:partition]  || 0
  self.host        = options[:host]       || HOST
  self.port        = options[:port]       || PORT
  self.compression = options[:compression] || Message::NO_COMPRESSION
  self.connect(self.host, self.port)
end

Instance Attribute Details

#partitionObject

Returns the value of attribute partition.



20
21
22
# File 'lib/kafka/producer.rb', line 20

def partition
  @partition
end

#topicObject

Returns the value of attribute topic.



20
21
22
# File 'lib/kafka/producer.rb', line 20

def topic
  @topic
end

Instance Method Details

#batch(&block) ⇒ Object



35
36
37
38
39
40
# File 'lib/kafka/producer.rb', line 35

def batch(&block)
  batch = Kafka::Batch.new
  block.call( batch )
  push(batch.messages)
  batch.messages.clear
end

#push(messages) ⇒ Object



31
32
33
# File 'lib/kafka/producer.rb', line 31

def push(messages)
  self.write(Encoder.produce(self.topic, self.partition, messages, compression))
end