Class: EventMachine::Kafka::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/em-kafka/client.rb

Instance Method Summary collapse

Constructor Details

#initialize(host, port) ⇒ Client

Returns a new instance of Client.



4
5
6
7
8
# File 'lib/em-kafka/client.rb', line 4

def initialize(host, port)
  @host = host || 'localhost'
  @port = port || 9092
  @callback = nil
end

Instance Method Details

#close_connectionObject



27
28
29
# File 'lib/em-kafka/client.rb', line 27

def close_connection
  @connection.close_connection_after_writing
end

#connectObject



19
20
21
22
23
24
25
# File 'lib/em-kafka/client.rb', line 19

def connect
  @connection = EM.connect(@host, @port, EM::Kafka::Connection)
  @connection.on(:message) do |message|
    @callback.call(message) if @callback
  end
  @connection
end

#on_data(&block) ⇒ Object



15
16
17
# File 'lib/em-kafka/client.rb', line 15

def on_data(&block)
  @callback = block
end

#send_data(data) ⇒ Object



10
11
12
13
# File 'lib/em-kafka/client.rb', line 10

def send_data(data)
  connect if @connection.nil? || @connection.disconnected?
  @connection.send_data(data)
end