Class: Kafka::SSLSocketWithTimeout
- Inherits:
-
Object
- Object
- Kafka::SSLSocketWithTimeout
- Defined in:
- lib/kafka/ssl_socket_with_timeout.rb
Overview
Opens sockets in a non-blocking fashion, ensuring that we're not stalling for long periods of time.
It's possible to set timeouts for connecting to the server, for reading data, and for writing data. Whenever a timeout is exceeded, Errno::ETIMEDOUT is raised.
Instance Method Summary collapse
- #close ⇒ Object
- #closed? ⇒ Boolean
-
#initialize(host, port, connect_timeout: nil, timeout: nil, ssl_context:) ⇒ SSLSocketWithTimeout
constructor
Opens a socket.
-
#read(num_bytes) ⇒ String
Reads bytes from the socket, possible with a timeout.
- #select_with_timeout(socket, type) ⇒ Object
- #set_encoding(encoding) ⇒ Object
-
#write(bytes) ⇒ Integer
Writes bytes to the socket, possible with a timeout.
Constructor Details
#initialize(host, port, connect_timeout: nil, timeout: nil, ssl_context:) ⇒ SSLSocketWithTimeout
Opens a socket.
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 |
# File 'lib/kafka/ssl_socket_with_timeout.rb', line 22 def initialize(host, port, connect_timeout: nil, timeout: nil, ssl_context:) addr = Socket.getaddrinfo(host, nil) sockaddr = Socket.pack_sockaddr_in(port, addr[0][3]) @connect_timeout = connect_timeout @timeout = timeout @tcp_socket = Socket.new(Socket.const_get(addr[0][0]), Socket::SOCK_STREAM, 0) @tcp_socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1) # first initiate the TCP socket begin # Initiate the socket connection in the background. If it doesn't fail # immediately it will raise an IO::WaitWritable (Errno::EINPROGRESS) # indicating the connection is in progress. @tcp_socket.connect_nonblock(sockaddr) rescue IO::WaitWritable # select will block until the socket is writable or the timeout # is exceeded, whichever comes first. unless select_with_timeout(@tcp_socket, :connect_write) # select returns nil when the socket is not ready before timeout # seconds have elapsed @tcp_socket.close raise Errno::ETIMEDOUT end begin # Verify there is now a good connection. @tcp_socket.connect_nonblock(sockaddr) rescue Errno::EISCONN # The socket is connected, we're good! end end # once that's connected, we can start initiating the ssl socket @ssl_socket = OpenSSL::SSL::SSLSocket.new(@tcp_socket, ssl_context) begin # Initiate the socket connection in the background. If it doesn't fail # immediately it will raise an IO::WaitWritable (Errno::EINPROGRESS) # indicating the connection is in progress. # Unlike waiting for a tcp socket to connect, you can't time out ssl socket # connections during the connect phase properly, because IO.select only partially works. # Instead, you have to retry. @ssl_socket.connect_nonblock rescue Errno::EAGAIN, Errno::EWOULDBLOCK, IO::WaitReadable if select_with_timeout(@ssl_socket, :connect_read) retry else @ssl_socket.close close raise Errno::ETIMEDOUT end rescue IO::WaitWritable if select_with_timeout(@ssl_socket, :connect_write) retry else close raise Errno::ETIMEDOUT end end end |
Instance Method Details
#close ⇒ Object
157 158 159 160 |
# File 'lib/kafka/ssl_socket_with_timeout.rb', line 157 def close @tcp_socket.close @ssl_socket.close end |
#closed? ⇒ Boolean
162 163 164 |
# File 'lib/kafka/ssl_socket_with_timeout.rb', line 162 def closed? @tcp_socket.closed? || @ssl_socket.closed? end |
#read(num_bytes) ⇒ String
Reads bytes from the socket, possible with a timeout.
90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 |
# File 'lib/kafka/ssl_socket_with_timeout.rb', line 90 def read(num_bytes) buffer = '' until buffer.length >= num_bytes begin # unlike plain tcp sockets, ssl sockets don't support IO.select # properly. # Instead, timeouts happen on a per read basis, and we have to # catch exceptions from read_nonblock, and gradually build up # our read buffer. buffer << @ssl_socket.read_nonblock(num_bytes - buffer.length) rescue IO::WaitReadable if select_with_timeout(@ssl_socket, :read) retry else raise Errno::ETIMEDOUT end rescue IO::WaitWritable if select_with_timeout(@ssl_socket, :write) retry else raise Errno::ETIMEDOUT end end end buffer end |
#select_with_timeout(socket, type) ⇒ Object
170 171 172 173 174 175 176 177 178 179 180 181 |
# File 'lib/kafka/ssl_socket_with_timeout.rb', line 170 def select_with_timeout(socket, type) case type when :connect_read IO.select([socket], nil, nil, @connect_timeout) when :connect_write IO.select(nil, [socket], nil, @connect_timeout) when :read IO.select([socket], nil, nil, @timeout) when :write IO.select(nil, [socket], nil, @timeout) end end |
#set_encoding(encoding) ⇒ Object
166 167 168 |
# File 'lib/kafka/ssl_socket_with_timeout.rb', line 166 def set_encoding(encoding) @tcp_socket.set_encoding(encoding) end |
#write(bytes) ⇒ Integer
Writes bytes to the socket, possible with a timeout.
122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 |
# File 'lib/kafka/ssl_socket_with_timeout.rb', line 122 def write(bytes) loop do written = 0 begin # unlike plain tcp sockets, ssl sockets don't support IO.select # properly. # Instead, timeouts happen on a per write basis, and we have to # catch exceptions from write_nonblock, and gradually build up # our write buffer. written += @ssl_socket.write_nonblock(bytes) rescue Errno::EFAULT => error raise error rescue OpenSSL::SSL::SSLError, Errno::EAGAIN, Errno::EWOULDBLOCK, IO::WaitWritable => error if error.is_a?(OpenSSL::SSL::SSLError) && error. == 'write would block' if select_with_timeout(@ssl_socket, :write) retry else raise Errno::ETIMEDOUT end else raise error end end # Fast, common case. break if written == bytes.size # This takes advantage of the fact that most ruby implementations # have Copy-On-Write strings. Thusly why requesting a subrange # of data, we actually don't copy data because the new string # simply references a subrange of the original. bytes = bytes[written, bytes.size] end end |