Module: Kafka::IO

Included in:
Producer
Defined in:
lib/kafka/io.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#hostObject

Returns the value of attribute host.



3
4
5
# File 'lib/kafka/io.rb', line 3

def host
  @host
end

#portObject

Returns the value of attribute port.



3
4
5
# File 'lib/kafka/io.rb', line 3

def port
  @port
end

#socketObject

Returns the value of attribute socket.



3
4
5
# File 'lib/kafka/io.rb', line 3

def socket
  @socket
end

Instance Method Details

#connect(host, port) ⇒ Object

Raises:

  • (ArgumentError)


5
6
7
8
9
10
# File 'lib/kafka/io.rb', line 5

def connect(host, port)
  raise ArgumentError, "No host or port specified" unless host && port
  self.host = host
  self.port = port
  self.socket = TCPSocket.new(host, port)
end

#disconnectObject



17
18
19
20
# File 'lib/kafka/io.rb', line 17

def disconnect
  self.socket.close rescue nil
  self.socket = nil
end

#reconnectObject



12
13
14
15
# File 'lib/kafka/io.rb', line 12

def reconnect
  self.disconnect
  self.socket = self.connect(self.host, self.port)
end

#write(data) ⇒ Object



22
23
24
25
26
27
28
# File 'lib/kafka/io.rb', line 22

def write(data)
  self.reconnect unless self.socket
  self.socket.write(data)
rescue Errno::ECONNRESET, Errno::EPIPE, Errno::ECONNABORTED
  self.reconnect
  self.socket.write(data) # retry
end