Class: Kafka::Producer

Inherits:
Object
  • Object
show all
Includes:
IO
Defined in:
lib/kafka/producer.rb

Constant Summary collapse

PRODUCE_REQUEST_ID =
Kafka::RequestType::PRODUCE

Instance Attribute Summary collapse

Attributes included from IO

#host, #port, #socket

Instance Method Summary collapse

Methods included from IO

#connect, #disconnect, #read, #reconnect, #write

Constructor Details

#initialize(options = {}) ⇒ Producer

Returns a new instance of Producer.



24
25
26
27
28
29
30
# File 'lib/kafka/producer.rb', line 24

def initialize(options = {})
  self.topic     = options[:topic]      || "test"
  self.partition = options[:partition]  || 0
  self.host      = options[:host]       || "localhost"
  self.port      = options[:port]       || 9092
  self.connect(self.host, self.port)
end

Instance Attribute Details

#partitionObject

Returns the value of attribute partition.



22
23
24
# File 'lib/kafka/producer.rb', line 22

def partition
  @partition
end

#topicObject

Returns the value of attribute topic.



22
23
24
# File 'lib/kafka/producer.rb', line 22

def topic
  @topic
end

Instance Method Details

#batch(&block) ⇒ Object



61
62
63
64
65
66
# File 'lib/kafka/producer.rb', line 61

def batch(&block)
  batch = Kafka::Batch.new
  block.call( batch )
  self.send(batch.messages)
  batch.messages.clear
end

#encode(message) ⇒ Object



32
33
34
35
36
37
38
39
# File 'lib/kafka/producer.rb', line 32

def encode(message)
  if RUBY_VERSION[0,3] == "1.8"  # Use old iconv on Ruby 1.8 for encoding
    ic = Iconv.new('UTF-8//IGNORE', 'UTF-8')
    [message.magic].pack("C") + [message.calculate_checksum].pack("N") + ic.iconv(message.payload.to_s)
  else
    [message.magic].pack("C") + [message.calculate_checksum].pack("N") + message.payload.to_s.force_encoding(Encoding::ASCII_8BIT)
  end
end

#encode_request(topic, partition, messages) ⇒ Object



41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/kafka/producer.rb', line 41

def encode_request(topic, partition, messages)
  message_set = Array(messages).collect { |message|
    encoded_message = self.encode(message)
    [encoded_message.length].pack("N") + encoded_message
  }.join("")

  request   = [PRODUCE_REQUEST_ID].pack("n")
  topic     = [topic.length].pack("n") + topic
  partition = [partition].pack("N")
  messages  = [message_set.length].pack("N") + message_set

  data = request + topic + partition + messages

  return [data.length].pack("N") + data
end

#send(messages) ⇒ Object



57
58
59
# File 'lib/kafka/producer.rb', line 57

def send(messages)
  self.write(self.encode_request(self.topic, self.partition, messages))
end