Class: Kafka::SSLSocketWithTimeout

Inherits:
Object
  • Object
show all
Defined in:
lib/kafka/ssl_socket_with_timeout.rb

Overview

Opens sockets in a non-blocking fashion, ensuring that we're not stalling for long periods of time.

It's possible to set timeouts for connecting to the server, for reading data, and for writing data. Whenever a timeout is exceeded, Errno::ETIMEDOUT is raised.

Instance Method Summary collapse

Constructor Details

#initialize(host, port, connect_timeout: nil, timeout: nil, ssl_context:) ⇒ SSLSocketWithTimeout

Opens a socket.

Parameters:

  • host (String)
  • port (Integer)
  • connect_timeout (Integer) (defaults to: nil)

    the connection timeout, in seconds.

  • timeout (Integer) (defaults to: nil)

    the read and write timeout, in seconds.

  • ssl_context (OpenSSL::SSL::SSLContext)

    which SSLContext the ssl connection should use

Raises:

  • (Errno::ETIMEDOUT)

    if the timeout is exceeded.



24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/kafka/ssl_socket_with_timeout.rb', line 24

def initialize(host, port, connect_timeout: nil, timeout: nil, ssl_context:)
  addr = Socket.getaddrinfo(host, nil)
  sockaddr = Socket.pack_sockaddr_in(port, addr[0][3])

  @connect_timeout = connect_timeout
  @timeout = timeout

  @tcp_socket = Socket.new(Socket.const_get(addr[0][0]), Socket::SOCK_STREAM, 0)
  @tcp_socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)

  # first initiate the TCP socket
  begin
    # Initiate the socket connection in the background. If it doesn't fail
    # immediately it will raise an IO::WaitWritable (Errno::EINPROGRESS)
    # indicating the connection is in progress.
    @tcp_socket.connect_nonblock(sockaddr)
  rescue IO::WaitWritable
    # select will block until the socket is writable or the timeout
    # is exceeded, whichever comes first.
    unless select_with_timeout(@tcp_socket, :connect_write)
      # select returns nil when the socket is not ready before timeout
      # seconds have elapsed
      @tcp_socket.close
      raise Errno::ETIMEDOUT
    end

    begin
      # Verify there is now a good connection.
      @tcp_socket.connect_nonblock(sockaddr)
    rescue Errno::EISCONN
      # The socket is connected, we're good!
    end
  end

  # once that's connected, we can start initiating the ssl socket
  @ssl_socket = OpenSSL::SSL::SSLSocket.new(@tcp_socket, ssl_context)
  @ssl_socket.hostname = host

  begin
    # Initiate the socket connection in the background. If it doesn't fail
    # immediately it will raise an IO::WaitWritable (Errno::EINPROGRESS)
    # indicating the connection is in progress.
    # Unlike waiting for a tcp socket to connect, you can't time out ssl socket
    # connections during the connect phase properly, because IO.select only partially works.
    # Instead, you have to retry.
    @ssl_socket.connect_nonblock
  rescue Errno::EAGAIN, Errno::EWOULDBLOCK, IO::WaitReadable
    if select_with_timeout(@ssl_socket, :connect_read)
      retry
    else
      @ssl_socket.close
      close
      raise Errno::ETIMEDOUT
    end
  rescue IO::WaitWritable
    if select_with_timeout(@ssl_socket, :connect_write)
      retry
    else
      close
      raise Errno::ETIMEDOUT
    end
  end
end

Instance Method Details

#closeObject



162
163
164
165
# File 'lib/kafka/ssl_socket_with_timeout.rb', line 162

def close
  @tcp_socket.close
  @ssl_socket.close
end

#closed?Boolean

Returns:

  • (Boolean)


167
168
169
# File 'lib/kafka/ssl_socket_with_timeout.rb', line 167

def closed?
  @tcp_socket.closed? || @ssl_socket.closed?
end

#read(num_bytes) ⇒ String

Reads bytes from the socket, possible with a timeout.

Parameters:

  • num_bytes (Integer)

    the number of bytes to read.

Returns:

  • (String)

    the data that was read from the socket.

Raises:

  • (Errno::ETIMEDOUT)

    if the timeout is exceeded.



93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
# File 'lib/kafka/ssl_socket_with_timeout.rb', line 93

def read(num_bytes)
  buffer = String.new

  until buffer.length >= num_bytes
    begin
      # Unlike plain TCP sockets, SSL sockets don't support IO.select
      # properly.
      # Instead, timeouts happen on a per read basis, and we have to
      # catch exceptions from read_nonblock and gradually build up
      # our read buffer.
      buffer << @ssl_socket.read_nonblock(num_bytes - buffer.length)
    rescue IO::WaitReadable
      if select_with_timeout(@ssl_socket, :read)
        retry
      else
        raise Errno::ETIMEDOUT
      end
    rescue IO::WaitWritable
      if select_with_timeout(@ssl_socket, :write)
        retry
      else
        raise Errno::ETIMEDOUT
      end
    end
  end

  buffer
end

#select_with_timeout(socket, type) ⇒ Object



175
176
177
178
179
180
181
182
183
184
185
186
# File 'lib/kafka/ssl_socket_with_timeout.rb', line 175

def select_with_timeout(socket, type)
  case type
  when :connect_read
    IO.select([socket], nil, nil, @connect_timeout)
  when :connect_write
    IO.select(nil, [socket], nil, @connect_timeout)
  when :read
    IO.select([socket], nil, nil, @timeout)
  when :write
    IO.select(nil, [socket], nil, @timeout)
  end
end

#set_encoding(encoding) ⇒ Object



171
172
173
# File 'lib/kafka/ssl_socket_with_timeout.rb', line 171

def set_encoding(encoding)
  @tcp_socket.set_encoding(encoding)
end

#write(bytes) ⇒ Integer

Writes bytes to the socket, possible with a timeout.

Parameters:

  • bytes (String)

    the data that should be written to the socket.

Returns:

  • (Integer)

    the number of bytes written.

Raises:

  • (Errno::ETIMEDOUT)

    if the timeout is exceeded.



127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
# File 'lib/kafka/ssl_socket_with_timeout.rb', line 127

def write(bytes)
  loop do
    written = 0
    begin
      # unlike plain tcp sockets, ssl sockets don't support IO.select
      # properly.
      # Instead, timeouts happen on a per write basis, and we have to
      # catch exceptions from write_nonblock, and gradually build up
      # our write buffer.
      written += @ssl_socket.write_nonblock(bytes)
    rescue Errno::EFAULT => error
      raise error
    rescue OpenSSL::SSL::SSLError, Errno::EAGAIN, Errno::EWOULDBLOCK, IO::WaitWritable => error
      if error.is_a?(OpenSSL::SSL::SSLError) && error.message == 'write would block'
        if select_with_timeout(@ssl_socket, :write)
          retry
        else
          raise Errno::ETIMEDOUT
        end
      else
        raise error
      end
    end

    # Fast, common case.
    break if written == bytes.size

    # This takes advantage of the fact that most ruby implementations
    # have Copy-On-Write strings. Thusly why requesting a subrange
    # of data, we actually don't copy data because the new string
    # simply references a subrange of the original.
    bytes = bytes[written, bytes.size]
  end
end