Class: Kafka::SocketWithTimeout

Inherits:
Object
  • Object
show all
Defined in:
lib/kafka/socket_with_timeout.rb

Overview

Opens sockets in a non-blocking fashion, ensuring that we're not stalling for long periods of time.

It's possible to set timeouts for connecting to the server, for reading data, and for writing data. Whenever a timeout is exceeded, Errno::ETIMEDOUT is raised.

Instance Method Summary collapse

Constructor Details

#initialize(host, port, connect_timeout: nil, timeout: nil) ⇒ SocketWithTimeout

Opens a socket.

Parameters:

  • host (String)
  • port (Integer)
  • connect_timeout (Integer) (defaults to: nil)

    the connection timeout, in seconds.

  • timeout (Integer) (defaults to: nil)

    the read and write timeout, in seconds.

Raises:

  • (Errno::ETIMEDOUT)

    if the timeout is exceeded.


21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/kafka/socket_with_timeout.rb', line 21

def initialize(host, port, connect_timeout: nil, timeout: nil)
  addr = Socket.getaddrinfo(host, nil)
  sockaddr = Socket.pack_sockaddr_in(port, addr[0][3])

  @timeout = timeout

  @socket = Socket.new(Socket.const_get(addr[0][0]), Socket::SOCK_STREAM, 0)
  @socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)

  begin
    # Initiate the socket connection in the background. If it doesn't fail
    # immediately it will raise an IO::WaitWritable (Errno::EINPROGRESS)
    # indicating the connection is in progress.
    @socket.connect_nonblock(sockaddr)
  rescue IO::WaitWritable    # IO.select will block until the socket is writable or the timeout
    # is exceeded, whichever comes first.

    unless IO.select(nil, [@socket], nil, connect_timeout)      # IO.select returns nil when the socket is not ready before timeout
      # seconds have elapsed

      @socket.close
      raise Errno::ETIMEDOUT
    end

    begin
      # Verify there is now a good connection.
      @socket.connect_nonblock(sockaddr)
    rescue Errno::EISCONN      # The socket is connected, we're good!

    end
  end
end

Instance Method Details

#closeObject


80
81
82
# File 'lib/kafka/socket_with_timeout.rb', line 80

def close
  @socket.close
end

#read(num_bytes) ⇒ String

Reads bytes from the socket, possible with a timeout.

Parameters:

  • num_bytes (Integer)

    the number of bytes to read.

Returns:

  • (String)

    the data that was read from the socket.

Raises:

  • (Errno::ETIMEDOUT)

    if the timeout is exceeded.


59
60
61
62
63
64
65
# File 'lib/kafka/socket_with_timeout.rb', line 59

def read(num_bytes)
  unless IO.select([@socket], nil, nil, @timeout)
    raise Errno::ETIMEDOUT
  end

  @socket.read(num_bytes)
end

#set_encoding(encoding) ⇒ Object


84
85
86
# File 'lib/kafka/socket_with_timeout.rb', line 84

def set_encoding(encoding)
  @socket.set_encoding(encoding)
end

#write(bytes) ⇒ Integer

Writes bytes to the socket, possible with a timeout.

Parameters:

  • bytes (String)

    the data that should be written to the socket.

Returns:

  • (Integer)

    the number of bytes written.

Raises:

  • (Errno::ETIMEDOUT)

    if the timeout is exceeded.


72
73
74
75
76
77
78
# File 'lib/kafka/socket_with_timeout.rb', line 72

def write(bytes)
  unless IO.select(nil, [@socket], nil, @timeout)
    raise Errno::ETIMEDOUT
  end

  @socket.write(bytes)
end