Module: Kafka::IO

Included in:
Consumer, Producer
Defined in:
lib/kafka/io.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#hostObject

Returns the value of attribute host.



17
18
19
# File 'lib/kafka/io.rb', line 17

def host
  @host
end

#portObject

Returns the value of attribute port.



17
18
19
# File 'lib/kafka/io.rb', line 17

def port
  @port
end

#socketObject

Returns the value of attribute socket.



17
18
19
# File 'lib/kafka/io.rb', line 17

def socket
  @socket
end

Instance Method Details

#connect(host, port) ⇒ Object

Raises:

  • (ArgumentError)


19
20
21
22
23
24
# File 'lib/kafka/io.rb', line 19

def connect(host, port)
  raise ArgumentError, "No host or port specified" unless host && port
  self.host = host
  self.port = port
  self.socket = TCPSocket.new(host, port)
end

#disconnectObject



33
34
35
36
# File 'lib/kafka/io.rb', line 33

def disconnect
  self.socket.close rescue nil
  self.socket = nil
end

#read(length) ⇒ Object



38
39
40
41
42
43
# File 'lib/kafka/io.rb', line 38

def read(length)
  self.socket.read(length) || raise(SocketError, "no data")
rescue
  self.disconnect
  raise SocketError, "cannot read: #{$!.message}"
end

#reconnectObject



26
27
28
29
30
31
# File 'lib/kafka/io.rb', line 26

def reconnect
  self.socket = TCPSocket.new(self.host, self.port)
rescue
  self.disconnect
  raise
end

#write(data) ⇒ Object



45
46
47
48
49
50
51
# File 'lib/kafka/io.rb', line 45

def write(data)
  self.reconnect unless self.socket
  self.socket.write(data)
rescue
  self.disconnect
  raise SocketError, "cannot write: #{$!.message}"
end