Method: Kafka::SSLSocketWithTimeout#initialize

Defined in:
lib/kafka/ssl_socket_with_timeout.rb

#initialize(host, port, connect_timeout: nil, timeout: nil, ssl_context:) ⇒ SSLSocketWithTimeout

Opens a socket.

Parameters:

  • host (String)
  • port (Integer)
  • connect_timeout (Integer) (defaults to: nil)

    the connection timeout, in seconds.

  • timeout (Integer) (defaults to: nil)

    the read and write timeout, in seconds.

  • ssl_context (OpenSSL::SSL::SSLContext)

    which SSLContext the ssl connection should use

Raises:

  • (Errno::ETIMEDOUT)

    if the timeout is exceeded.



22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/kafka/ssl_socket_with_timeout.rb', line 22

def initialize(host, port, connect_timeout: nil, timeout: nil, ssl_context:)
  addr = Socket.getaddrinfo(host, nil)
  sockaddr = Socket.pack_sockaddr_in(port, addr[0][3])

  @timeout = timeout

  @tcp_socket = Socket.new(Socket.const_get(addr[0][0]), Socket::SOCK_STREAM, 0)
  @tcp_socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)

  # first initiate the TCP socket
  begin
    # Initiate the socket connection in the background. If it doesn't fail 
    # immediately it will raise an IO::WaitWritable (Errno::EINPROGRESS) 
    # indicating the connection is in progress.
    @tcp_socket.connect_nonblock(sockaddr)
  rescue IO::WaitWritable
    # IO.select will block until the socket is writable or the timeout
    # is exceeded, whichever comes first.
    unless IO.select(nil, [@tcp_socket], nil, connect_timeout)
      # IO.select returns nil when the socket is not ready before timeout 
      # seconds have elapsed
      @tcp_socket.close
      raise Errno::ETIMEDOUT
    end

    begin
      # Verify there is now a good connection.
      @tcp_socket.connect_nonblock(sockaddr)
    rescue Errno::EISCONN
      # The socket is connected, we're good!
    end
  end

  # once that's connected, we can start initiating the ssl socket
  @ssl_socket = OpenSSL::SSL::SSLSocket.new(@tcp_socket, ssl_context)

  begin
    # Initiate the socket connection in the background. If it doesn't fail 
    # immediately it will raise an IO::WaitWritable (Errno::EINPROGRESS) 
    # indicating the connection is in progress.
    # Unlike waiting for a tcp socket to connect, you can't time out ssl socket
    # connections during the connect phase properly, because IO.select only partially works.
    # Instead, you have to retry.
    @ssl_socket.connect_nonblock
  rescue Errno::EAGAIN, Errno::EWOULDBLOCK, IO::WaitReadable
    IO.select([@ssl_socket])
    retry
  rescue IO::WaitWritable
    IO.select(nil, [@ssl_socket])
    retry
  end
end