Class: EventMachine::Kafka::Producer

Inherits:
Object
  • Object
show all
Defined in:
lib/em-kafka/producer.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(uri) ⇒ Producer

Returns a new instance of Producer.



7
8
9
10
11
12
13
14
15
16
17
18
# File 'lib/em-kafka/producer.rb', line 7

def initialize(uri)
  uri = URI(uri)
  self.host = uri.host
  self.port = uri.port
  self.topic = uri.user
  self.partition = uri.path[1..-1].to_i

  raise ArgumentError("topic required") unless topic

  self.client = EM::Kafka::Client.new(host, port)
  client.connect
end

Instance Attribute Details

#clientObject

Returns the value of attribute client.



5
6
7
# File 'lib/em-kafka/producer.rb', line 5

def client
  @client
end

#hostObject

Returns the value of attribute host.



5
6
7
# File 'lib/em-kafka/producer.rb', line 5

def host
  @host
end

#partitionObject

Returns the value of attribute partition.



5
6
7
# File 'lib/em-kafka/producer.rb', line 5

def partition
  @partition
end

#portObject

Returns the value of attribute port.



5
6
7
# File 'lib/em-kafka/producer.rb', line 5

def port
  @port
end

#topicObject

Returns the value of attribute topic.



5
6
7
# File 'lib/em-kafka/producer.rb', line 5

def topic
  @topic
end

Instance Method Details

#deliver(message) ⇒ Object



20
21
22
23
# File 'lib/em-kafka/producer.rb', line 20

def deliver(message)
  request = EM::Kafka::ProducerRequest.new(topic, partition, message)
  client.send_data(request.encode)
end