Class: Kafka::SSLSocketWithTimeout

Inherits:
Object
  • Object
show all
Defined in:
lib/kafka/ssl_socket_with_timeout.rb

Overview

Opens sockets in a non-blocking fashion, ensuring that we're not stalling for long periods of time.

It's possible to set timeouts for connecting to the server, for reading data, and for writing data. Whenever a timeout is exceeded, Errno::ETIMEDOUT is raised.

Instance Method Summary collapse

Constructor Details

#initialize(host, port, connect_timeout: nil, timeout: nil, ssl_context:) ⇒ SSLSocketWithTimeout

Opens a socket.

Parameters:

  • host (String)
  • port (Integer)
  • connect_timeout (Integer) (defaults to: nil)

    the connection timeout, in seconds.

  • timeout (Integer) (defaults to: nil)

    the read and write timeout, in seconds.

  • ssl_context (OpenSSL::SSL::SSLContext)

    which SSLContext the ssl connection should use

Raises:

  • (Errno::ETIMEDOUT)

    if the timeout is exceeded.



22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/kafka/ssl_socket_with_timeout.rb', line 22

def initialize(host, port, connect_timeout: nil, timeout: nil, ssl_context:)
  addr = Socket.getaddrinfo(host, nil)
  sockaddr = Socket.pack_sockaddr_in(port, addr[0][3])

  @timeout = timeout

  @tcp_socket = Socket.new(Socket.const_get(addr[0][0]), Socket::SOCK_STREAM, 0)
  @tcp_socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)

  # first initiate the TCP socket
  begin
    # Initiate the socket connection in the background. If it doesn't fail 
    # immediately it will raise an IO::WaitWritable (Errno::EINPROGRESS) 
    # indicating the connection is in progress.
    @tcp_socket.connect_nonblock(sockaddr)
  rescue IO::WaitWritable
    # IO.select will block until the socket is writable or the timeout
    # is exceeded, whichever comes first.
    unless IO.select(nil, [@tcp_socket], nil, connect_timeout)
      # IO.select returns nil when the socket is not ready before timeout 
      # seconds have elapsed
      @tcp_socket.close
      raise Errno::ETIMEDOUT
    end

    begin
      # Verify there is now a good connection.
      @tcp_socket.connect_nonblock(sockaddr)
    rescue Errno::EISCONN
      # The socket is connected, we're good!
    end
  end

  # once that's connected, we can start initiating the ssl socket
  @ssl_socket = OpenSSL::SSL::SSLSocket.new(@tcp_socket, ssl_context)

  begin
    # Initiate the socket connection in the background. If it doesn't fail 
    # immediately it will raise an IO::WaitWritable (Errno::EINPROGRESS) 
    # indicating the connection is in progress.
    # Unlike waiting for a tcp socket to connect, you can't time out ssl socket
    # connections during the connect phase properly, because IO.select only partially works.
    # Instead, you have to retry.
    @ssl_socket.connect_nonblock
  rescue Errno::EAGAIN, Errno::EWOULDBLOCK, IO::WaitReadable
    IO.select([@ssl_socket])
    retry
  rescue IO::WaitWritable
    IO.select(nil, [@ssl_socket])
    retry
  end
end

Instance Method Details

#closeObject



145
146
147
148
# File 'lib/kafka/ssl_socket_with_timeout.rb', line 145

def close
  @tcp_socket.close
  @ssl_socket.close
end

#read(num_bytes) ⇒ String

Reads bytes from the socket, possible with a timeout.

Parameters:

  • num_bytes (Integer)

    the number of bytes to read.

Returns:

  • (String)

    the data that was read from the socket.

Raises:

  • (Errno::ETIMEDOUT)

    if the timeout is exceeded.



80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
# File 'lib/kafka/ssl_socket_with_timeout.rb', line 80

def read(num_bytes)
  buffer = ''
  until buffer.length >= num_bytes
    begin
      # unlike plain tcp sockets, ssl sockets don't support IO.select
      # properly.
      # Instead, timeouts happen on a per read basis, and we have to
      # catch exceptions from read_nonblock, and gradually build up
      # our read buffer.
      buffer << @ssl_socket.read_nonblock(num_bytes - buffer.length)
    rescue IO::WaitReadable
      unless IO.select([@ssl_socket], nil, nil, @timeout)
        raise Errno::ETIMEDOUT
      end
      retry
    rescue IO::WaitWritable
      unless IO.select(nil, [@ssl_socket], nil, @timeout)
        raise Errno::ETIMEDOUT
      end
      retry
    end
  end
  buffer
end

#set_encoding(encoding) ⇒ Object



150
151
152
# File 'lib/kafka/ssl_socket_with_timeout.rb', line 150

def set_encoding(encoding)
  @tcp_socket.set_encoding(encoding)
end

#write(bytes) ⇒ Integer

Writes bytes to the socket, possible with a timeout.

Parameters:

  • bytes (String)

    the data that should be written to the socket.

Returns:

  • (Integer)

    the number of bytes written.

Raises:

  • (Errno::ETIMEDOUT)

    if the timeout is exceeded.



110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
# File 'lib/kafka/ssl_socket_with_timeout.rb', line 110

def write(bytes)
  loop do
    written = 0
    begin
      # unlike plain tcp sockets, ssl sockets don't support IO.select
      # properly.
      # Instead, timeouts happen on a per write basis, and we have to
      # catch exceptions from write_nonblock, and gradually build up
      # our write buffer.
      written += @ssl_socket.write_nonblock(bytes)
    rescue Errno::EFAULT => error
        raise error
    rescue OpenSSL::SSL::SSLError, Errno::EAGAIN, Errno::EWOULDBLOCK, IO::WaitWritable => error
      if error.is_a?(OpenSSL::SSL::SSLError) && error.message == 'write would block'
        if IO.select(nil, [@ssl_socket], nil, @timeout)
          retry
        else
          raise Errno::ETIMEDOUT
        end
      else
        raise error
      end
    end

    # Fast, common case.
    break if written == bytes.size

    # This takes advantage of the fact that most ruby implementations
    # have Copy-On-Write strings. Thusly why requesting a subrange
    # of data, we actually don't copy data because the new string
    # simply references a subrange of the original.
    bytes = bytes[written, bytes.size]
  end
end