Module: Kafka::IO

Included in:
Consumer, MultiProducer, Producer
Defined in:
lib/kafka/io.rb

Constant Summary collapse

HOST =
"localhost"
PORT =
9092

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#compressionObject

Returns the value of attribute compression.



17
18
19
# File 'lib/kafka/io.rb', line 17

def compression
  @compression
end

#hostObject

Returns the value of attribute host.



17
18
19
# File 'lib/kafka/io.rb', line 17

def host
  @host
end

#portObject

Returns the value of attribute port.



17
18
19
# File 'lib/kafka/io.rb', line 17

def port
  @port
end

#socketObject

Returns the value of attribute socket.



17
18
19
# File 'lib/kafka/io.rb', line 17

def socket
  @socket
end

Instance Method Details

#connect(host, port) ⇒ Object

Raises:

  • (ArgumentError)


22
23
24
25
26
27
# File 'lib/kafka/io.rb', line 22

def connect(host, port)
  raise ArgumentError, "No host or port specified" unless host && port
  self.host = host
  self.port = port
  self.socket = TCPSocket.new(host, port)
end

#disconnectObject



36
37
38
39
# File 'lib/kafka/io.rb', line 36

def disconnect
  self.socket.close rescue nil
  self.socket = nil
end

#read(length) ⇒ Object



41
42
43
44
45
46
# File 'lib/kafka/io.rb', line 41

def read(length)
  self.socket.read(length) || raise(SocketError, "no data")
rescue
  self.disconnect
  raise SocketError, "cannot read: #{$!.message}"
end

#reconnectObject



29
30
31
32
33
34
# File 'lib/kafka/io.rb', line 29

def reconnect
  self.socket = TCPSocket.new(self.host, self.port)
rescue
  self.disconnect
  raise
end

#write(data) ⇒ Object



48
49
50
51
52
53
54
# File 'lib/kafka/io.rb', line 48

def write(data)
  self.reconnect unless self.socket
  self.socket.write(data)
rescue
  self.disconnect
  raise SocketError, "cannot write: #{$!.message}"
end