Class: Kafka::SSLSocketWithTimeout
- Inherits:
-
Object
- Object
- Kafka::SSLSocketWithTimeout
- Defined in:
- lib/kafka/ssl_socket_with_timeout.rb
Overview
Opens sockets in a non-blocking fashion, ensuring that we're not stalling for long periods of time.
It's possible to set timeouts for connecting to the server, for reading data, and for writing data. Whenever a timeout is exceeded, Errno::ETIMEDOUT is raised.
Instance Method Summary collapse
- #close ⇒ Object
-
#initialize(host, port, connect_timeout: nil, timeout: nil, ssl_context:) ⇒ SSLSocketWithTimeout
constructor
Opens a socket.
-
#read(num_bytes) ⇒ String
Reads bytes from the socket, possible with a timeout.
- #set_encoding(encoding) ⇒ Object
-
#write(bytes) ⇒ Integer
Writes bytes to the socket, possible with a timeout.
Constructor Details
#initialize(host, port, connect_timeout: nil, timeout: nil, ssl_context:) ⇒ SSLSocketWithTimeout
Opens a socket.
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 |
# File 'lib/kafka/ssl_socket_with_timeout.rb', line 22 def initialize(host, port, connect_timeout: nil, timeout: nil, ssl_context:) addr = Socket.getaddrinfo(host, nil) sockaddr = Socket.pack_sockaddr_in(port, addr[0][3]) @timeout = timeout @tcp_socket = Socket.new(Socket.const_get(addr[0][0]), Socket::SOCK_STREAM, 0) @tcp_socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1) # first initiate the TCP socket begin # Initiate the socket connection in the background. If it doesn't fail # immediately it will raise an IO::WaitWritable (Errno::EINPROGRESS) # indicating the connection is in progress. @tcp_socket.connect_nonblock(sockaddr) rescue IO::WaitWritable # IO.select will block until the socket is writable or the timeout # is exceeded, whichever comes first. unless IO.select(nil, [@tcp_socket], nil, connect_timeout) # IO.select returns nil when the socket is not ready before timeout # seconds have elapsed @tcp_socket.close raise Errno::ETIMEDOUT end begin # Verify there is now a good connection. @tcp_socket.connect_nonblock(sockaddr) rescue Errno::EISCONN # The socket is connected, we're good! end end # once that's connected, we can start initiating the ssl socket @ssl_socket = OpenSSL::SSL::SSLSocket.new(@tcp_socket, ssl_context) begin # Initiate the socket connection in the background. If it doesn't fail # immediately it will raise an IO::WaitWritable (Errno::EINPROGRESS) # indicating the connection is in progress. # Unlike waiting for a tcp socket to connect, you can't time out ssl socket # connections during the connect phase properly, because IO.select only partially works. # Instead, you have to retry. @ssl_socket.connect_nonblock rescue Errno::EAGAIN, Errno::EWOULDBLOCK, IO::WaitReadable IO.select([@ssl_socket]) retry rescue IO::WaitWritable IO.select(nil, [@ssl_socket]) retry end end |
Instance Method Details
#close ⇒ Object
145 146 147 148 |
# File 'lib/kafka/ssl_socket_with_timeout.rb', line 145 def close @tcp_socket.close @ssl_socket.close end |
#read(num_bytes) ⇒ String
Reads bytes from the socket, possible with a timeout.
80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 |
# File 'lib/kafka/ssl_socket_with_timeout.rb', line 80 def read(num_bytes) buffer = '' until buffer.length >= num_bytes begin # unlike plain tcp sockets, ssl sockets don't support IO.select # properly. # Instead, timeouts happen on a per read basis, and we have to # catch exceptions from read_nonblock, and gradually build up # our read buffer. buffer << @ssl_socket.read_nonblock(num_bytes - buffer.length) rescue IO::WaitReadable unless IO.select([@ssl_socket], nil, nil, @timeout) raise Errno::ETIMEDOUT end retry rescue IO::WaitWritable unless IO.select(nil, [@ssl_socket], nil, @timeout) raise Errno::ETIMEDOUT end retry end end buffer end |
#set_encoding(encoding) ⇒ Object
150 151 152 |
# File 'lib/kafka/ssl_socket_with_timeout.rb', line 150 def set_encoding(encoding) @tcp_socket.set_encoding(encoding) end |
#write(bytes) ⇒ Integer
Writes bytes to the socket, possible with a timeout.
110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 |
# File 'lib/kafka/ssl_socket_with_timeout.rb', line 110 def write(bytes) loop do written = 0 begin # unlike plain tcp sockets, ssl sockets don't support IO.select # properly. # Instead, timeouts happen on a per write basis, and we have to # catch exceptions from write_nonblock, and gradually build up # our write buffer. written += @ssl_socket.write_nonblock(bytes) rescue Errno::EFAULT => error raise error rescue OpenSSL::SSL::SSLError, Errno::EAGAIN, Errno::EWOULDBLOCK, IO::WaitWritable => error if error.is_a?(OpenSSL::SSL::SSLError) && error. == 'write would block' if IO.select(nil, [@ssl_socket], nil, @timeout) retry else raise Errno::ETIMEDOUT end else raise error end end # Fast, common case. break if written == bytes.size # This takes advantage of the fact that most ruby implementations # have Copy-On-Write strings. Thusly why requesting a subrange # of data, we actually don't copy data because the new string # simply references a subrange of the original. bytes = bytes[written, bytes.size] end end |