Method: Kafka::SSLSocketWithTimeout#initialize

Defined in:
lib/kafka/ssl_socket_with_timeout.rb

#initialize(host, port, connect_timeout: nil, timeout: nil, ssl_context:) ⇒ SSLSocketWithTimeout

Opens a socket.

Parameters:

  • host (String)
  • port (Integer)
  • connect_timeout (Integer) (defaults to: nil)

    the connection timeout, in seconds.

  • timeout (Integer) (defaults to: nil)

    the read and write timeout, in seconds.

  • ssl_context (OpenSSL::SSL::SSLContext)

    which SSLContext the ssl connection should use

Raises:

  • (Errno::ETIMEDOUT)

    if the timeout is exceeded.



24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/kafka/ssl_socket_with_timeout.rb', line 24

def initialize(host, port, connect_timeout: nil, timeout: nil, ssl_context:)
  addr = Socket.getaddrinfo(host, nil)
  sockaddr = Socket.pack_sockaddr_in(port, addr[0][3])

  @connect_timeout = connect_timeout
  @timeout = timeout

  @tcp_socket = Socket.new(Socket.const_get(addr[0][0]), Socket::SOCK_STREAM, 0)
  @tcp_socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)

  # first initiate the TCP socket
  begin
    # Initiate the socket connection in the background. If it doesn't fail
    # immediately it will raise an IO::WaitWritable (Errno::EINPROGRESS)
    # indicating the connection is in progress.
    @tcp_socket.connect_nonblock(sockaddr)
  rescue IO::WaitWritable
    # select will block until the socket is writable or the timeout
    # is exceeded, whichever comes first.
    unless select_with_timeout(@tcp_socket, :connect_write)
      # select returns nil when the socket is not ready before timeout
      # seconds have elapsed
      @tcp_socket.close
      raise Errno::ETIMEDOUT
    end

    begin
      # Verify there is now a good connection.
      @tcp_socket.connect_nonblock(sockaddr)
    rescue Errno::EISCONN
      # The socket is connected, we're good!
    end
  end

  # once that's connected, we can start initiating the ssl socket
  @ssl_socket = OpenSSL::SSL::SSLSocket.new(@tcp_socket, ssl_context)

  begin
    # Initiate the socket connection in the background. If it doesn't fail
    # immediately it will raise an IO::WaitWritable (Errno::EINPROGRESS)
    # indicating the connection is in progress.
    # Unlike waiting for a tcp socket to connect, you can't time out ssl socket
    # connections during the connect phase properly, because IO.select only partially works.
    # Instead, you have to retry.
    @ssl_socket.connect_nonblock
  rescue Errno::EAGAIN, Errno::EWOULDBLOCK, IO::WaitReadable
    if select_with_timeout(@ssl_socket, :connect_read)
      retry
    else
      @ssl_socket.close
      close
      raise Errno::ETIMEDOUT
    end
  rescue IO::WaitWritable
    if select_with_timeout(@ssl_socket, :connect_write)
      retry
    else
      close
      raise Errno::ETIMEDOUT
    end
  end
end