Class: Kafka::SocketWithTimeout

Inherits:
Object
  • Object
show all
Defined in:
lib/kafka/socket_with_timeout.rb

Overview

Opens sockets in a non-blocking fashion, ensuring that we're not stalling for long periods of time.

It's possible to set timeouts for connecting to the server, for reading data, and for writing data. Whenever a timeout is exceeded, Errno::ETIMEDOUT is raised.

Instance Method Summary collapse

Constructor Details

#initialize(host, port, connect_timeout: nil, timeout: nil) ⇒ SocketWithTimeout

Opens a socket.

Parameters:

  • host (String)
  • port (Integer)
  • connect_timeout (Integer)

    the connection timeout, in seconds.

  • timeout (Integer)

    the read and write timeout, in seconds.

Raises:

  • (Errno::ETIMEDOUT)

    if the timeout is exceeded.


23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/kafka/socket_with_timeout.rb', line 23

def initialize(host, port, connect_timeout: nil, timeout: nil)
  addr = Socket.getaddrinfo(host, nil)
  sockaddr = Socket.pack_sockaddr_in(port, addr[0][3])

  @timeout = timeout

  @socket = Socket.new(Socket.const_get(addr[0][0]), Socket::SOCK_STREAM, 0)
  @socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)

  begin
    # Initiate the socket connection in the background. If it doesn't fail
    # immediately it will raise an IO::WaitWritable (Errno::EINPROGRESS)
    # indicating the connection is in progress.
    @socket.connect_nonblock(sockaddr)
  rescue IO::WaitWritable
    # IO.select will block until the socket is writable or the timeout
    # is exceeded, whichever comes first.
    unless IO.select(nil, [@socket], nil, connect_timeout)
      # IO.select returns nil when the socket is not ready before timeout
      # seconds have elapsed
      @socket.close
      raise Errno::ETIMEDOUT
    end

    begin
      # Verify there is now a good connection.
      @socket.connect_nonblock(sockaddr)
    rescue Errno::EISCONN
      # The socket is connected, we're good!
    end
  end
end

Instance Method Details

#closeObject


84
85
86
# File 'lib/kafka/socket_with_timeout.rb', line 84

def close
  @socket.close
end

#closed?Boolean

Returns:

  • (Boolean)

88
89
90
# File 'lib/kafka/socket_with_timeout.rb', line 88

def closed?
  @socket.closed?
end

#read(num_bytes) ⇒ String

Reads bytes from the socket, possible with a timeout.

Parameters:

  • num_bytes (Integer)

    the number of bytes to read.

Returns:

  • (String)

    the data that was read from the socket.

Raises:

  • (Errno::ETIMEDOUT)

    if the timeout is exceeded.


61
62
63
64
65
66
67
68
69
# File 'lib/kafka/socket_with_timeout.rb', line 61

def read(num_bytes)
  unless IO.select([@socket], nil, nil, @timeout)
    raise Errno::ETIMEDOUT
  end

  @socket.read(num_bytes)
rescue IO::EAGAINWaitReadable
  retry
end

#set_encoding(encoding) ⇒ Object


92
93
94
# File 'lib/kafka/socket_with_timeout.rb', line 92

def set_encoding(encoding)
  @socket.set_encoding(encoding)
end

#write(bytes) ⇒ Integer

Writes bytes to the socket, possible with a timeout.

Parameters:

  • bytes (String)

    the data that should be written to the socket.

Returns:

  • (Integer)

    the number of bytes written.

Raises:

  • (Errno::ETIMEDOUT)

    if the timeout is exceeded.


76
77
78
79
80
81
82
# File 'lib/kafka/socket_with_timeout.rb', line 76

def write(bytes)
  unless IO.select(nil, [@socket], nil, @timeout)
    raise Errno::ETIMEDOUT
  end

  @socket.write(bytes)
end