Class: Riemann::Client::TcpSocket
- Inherits:
-
Object
- Object
- Riemann::Client::TcpSocket
- Defined in:
- lib/riemann/client/tcp_socket.rb
Overview
Socket: A specialized socket that has been configure
Direct Known Subclasses
Defined Under Namespace
Instance Attribute Summary collapse
-
#connect_timeout ⇒ Object
readonly
Internal: The timeout for connecting in seconds.
-
#host ⇒ Object
readonly
Internal: The host this socket is connected to.
-
#keepalive_count ⇒ Object
readonly
Internal.
-
#keepalive_idle ⇒ Object
readonly
Internal.
-
#keepalive_interval ⇒ Object
readonly
Internal.
-
#port ⇒ Object
readonly
Internal: The port this socket is connected to.
-
#read_timeout ⇒ Object
Internal: The timeout for reading in seconds.
-
#write_timeout ⇒ Object
readonly
Internal: The timeout for writing in seconds.
Class Method Summary collapse
-
.connect(options = {}) ⇒ Object
Internal: Create and connect to the given location.
Instance Method Summary collapse
-
#close ⇒ Object
Internal: Closes the internal ::Socket.
-
#closed? ⇒ Boolean
Internal: Return true the socket is closed.
-
#connect ⇒ Object
Internal:.
-
#connect_nonblock(addr, timeout) ⇒ Object
Internal: Connect to the give address within the timeout.
-
#connect_nonblock_finalize(sock, sockaddr) ⇒ Object
Internal: Make sure that a non-blocking connect has truely connected.
-
#connect_or_error(addr, deadline, errors) ⇒ Object
Internal: Connect to the destination or raise an error.
-
#initialize(options = {}) ⇒ TcpSocket
constructor
Internal: Creates a new KJess::Socket.
-
#keepalive_active? ⇒ Boolean
Internal: Return whether or not the keepalive_active flag is set.
-
#read(length, outbuf = nil) ⇒ Object
Reads length bytes from the socket.
-
#readpartial(maxlen, outbuf = nil) ⇒ Object
Internal: Read up to a maxlen of data from the socket and store it in outbuf.
-
#socket ⇒ Object
Internal: Return the connected raw Socket.
-
#socket_factory(type) ⇒ Object
Internal: Low level socket allocation and option configuration.
-
#using_keepalive? ⇒ Boolean
Internal: say if we are using TCP Keep Alive or not.
- #wait_readable(timeout = nil) ⇒ Object
- #wait_writable(timeout = nil) ⇒ Object
-
#write(buf) ⇒ Object
Internal: Write the given data to the socket.
Constructor Details
#initialize(options = {}) ⇒ TcpSocket
Internal: Creates a new KJess::Socket
85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 |
# File 'lib/riemann/client/tcp_socket.rb', line 85 def initialize( = {}) @host = [:host] @port = [:port] @connect_timeout = [:connect_timeout] || [:timeout] || 2 @read_timeout = [:read_timeout] || [:timeout] || 2 @write_timeout = [:write_timeout] || [:timeout] || 2 @keepalive_active = .fetch(:keepalive_active, true) @keepalive_idle = [:keepalive_idle] || 60 @keepalive_interval = [:keepalive_interval] || 30 @keepalive_count = [:keepalive_count] || 5 @socket = nil end |
Instance Attribute Details
#connect_timeout ⇒ Object (readonly)
Internal: The timeout for connecting in seconds. Defaults to 2
19 20 21 |
# File 'lib/riemann/client/tcp_socket.rb', line 19 def connect_timeout @connect_timeout end |
#host ⇒ Object (readonly)
Internal: The host this socket is connected to
27 28 29 |
# File 'lib/riemann/client/tcp_socket.rb', line 27 def host @host end |
#keepalive_count ⇒ Object (readonly)
Internal
Used for setting TCP_KEEPCNT: overrides tcp_keepalive_probes for a single socket.
tldp.org/HOWTO/TCP-Keepalive-HOWTO/usingkeepalive.html
tcp_keepalive_probes:
The number of unacknowledged probes to send before considering the
connection dead and notifying the application layer.
71 72 73 |
# File 'lib/riemann/client/tcp_socket.rb', line 71 def keepalive_count @keepalive_count end |
#keepalive_idle ⇒ Object (readonly)
Internal
Used for setting TCP_KEEPIDLE: overrides tcp_keepalive_time for a single socket.
tldp.org/HOWTO/TCP-Keepalive-HOWTO/usingkeepalive.html
tcp_keepalive_time:
The interval between the last data packet sent (simple ACKs are not
considered data) and the first keepalive probe; after the connection is
marked to need keepalive, this counter is not used any further.
45 46 47 |
# File 'lib/riemann/client/tcp_socket.rb', line 45 def keepalive_idle @keepalive_idle end |
#keepalive_interval ⇒ Object (readonly)
Internal
Used for setting TCP_KEEPINTVL: overrides tcp_keepalive_intvl for a single socket.
tldp.org/HOWTO/TCP-Keepalive-HOWTO/usingkeepalive.html
tcp_keepalive_intvl:
The interval between subsequential keepalive probes, regardless of what
the connection has exchanged in the meantime.
58 59 60 |
# File 'lib/riemann/client/tcp_socket.rb', line 58 def keepalive_interval @keepalive_interval end |
#port ⇒ Object (readonly)
Internal: The port this socket is connected to
31 32 33 |
# File 'lib/riemann/client/tcp_socket.rb', line 31 def port @port end |
#read_timeout ⇒ Object
Internal: The timeout for reading in seconds. Defaults to 2
15 16 17 |
# File 'lib/riemann/client/tcp_socket.rb', line 15 def read_timeout @read_timeout end |
#write_timeout ⇒ Object (readonly)
Internal: The timeout for writing in seconds. Defaults to 2
23 24 25 |
# File 'lib/riemann/client/tcp_socket.rb', line 23 def write_timeout @write_timeout end |
Class Method Details
.connect(options = {}) ⇒ Object
Internal: Create and connect to the given location.
options, same as Constructor
Returns an instance of KJess::Socket
78 79 80 81 82 |
# File 'lib/riemann/client/tcp_socket.rb', line 78 def self.connect( = {}) s = new() s.connect s end |
Instance Method Details
#close ⇒ Object
Internal: Closes the internal ::Socket
Returns nothing
150 151 152 153 |
# File 'lib/riemann/client/tcp_socket.rb', line 150 def close @socket.close unless closed? @socket = nil end |
#closed? ⇒ Boolean
Internal: Return true the socket is closed.
156 157 158 159 160 161 |
# File 'lib/riemann/client/tcp_socket.rb', line 156 def closed? return true if @socket.nil? return true if @socket.closed? false end |
#connect ⇒ Object
Internal:
Connect to the remote host in a non-blocking fashion.
Raise Error if there is a failure connecting.
Return the ::Socket on success
170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 |
# File 'lib/riemann/client/tcp_socket.rb', line 170 def connect # Calculate our timeout deadline deadline = Time.now.to_f + connect_timeout # Lookup destination address, we only want TCP. addrs = ::Socket.getaddrinfo(host, port, nil, ::Socket::SOCK_STREAM) errors = [] conn_error = -> { raise errors.first } sock = nil # Sort it so we get AF_INET, IPv4 addrs.sort.find(conn_error) do |addr| sock = connect_or_error(addr, deadline, errors) end sock end |
#connect_nonblock(addr, timeout) ⇒ Object
Internal: Connect to the give address within the timeout.
Make an attempt to connect to a single address within the given timeout.
Return the ::Socket when it is connected, or raise an Error if no connection was possible.
217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 |
# File 'lib/riemann/client/tcp_socket.rb', line 217 def connect_nonblock(addr, timeout) sockaddr = ::Socket.pack_sockaddr_in(addr[1], addr[3]) sock = socket_factory(addr[4]) sock.connect_nonblock(sockaddr) sock rescue Errno::EINPROGRESS if IO.select(nil, [sock], nil, timeout).nil? begin sock.close rescue StandardError nil end raise Timeout, "Could not connect to #{host}:#{port} within #{timeout} seconds" end connect_nonblock_finalize(sock, sockaddr) rescue StandardError => e begin sock.close rescue StandardError nil end raise Error, "Could not connect to #{host}:#{port}: #{e.class}: #{e.}", e.backtrace end |
#connect_nonblock_finalize(sock, sockaddr) ⇒ Object
Internal: Make sure that a non-blocking connect has truely connected.
Ensure that the given socket is actually connected to the given adddress.
Returning the socket if it is and raising an Error if it isn’t.
246 247 248 249 250 251 252 253 254 255 256 257 258 |
# File 'lib/riemann/client/tcp_socket.rb', line 246 def connect_nonblock_finalize(sock, sockaddr) sock.connect_nonblock(sockaddr) sock rescue Errno::EISCONN sock rescue StandardError => e begin sock.close rescue StandardError nil end raise Error, "Could not connect to #{host}:#{port}: #{e.class}: #{e.}", e.backtrace end |
#connect_or_error(addr, deadline, errors) ⇒ Object
Internal: Connect to the destination or raise an error.
Connect to the address or capture the error of the connection
addr - An address returned from Socket.getaddrinfo() deadline - the after which we should raise a timeout error errors - a collection of errors to append an error too should we have one.
Make an attempt to connect to the given address. If it is successful, return the socket.
Should the connection fail, append the exception to the errors array and return false.
201 202 203 204 205 206 207 208 209 |
# File 'lib/riemann/client/tcp_socket.rb', line 201 def connect_or_error(addr, deadline, errors) timeout = deadline - Time.now.to_f raise Timeout, "Could not connect to #{host}:#{port}" if timeout <= 0 connect_nonblock(addr, timeout) rescue Error => e errors << e false end |
#keepalive_active? ⇒ Boolean
Internal: Return whether or not the keepalive_active flag is set.
102 103 104 |
# File 'lib/riemann/client/tcp_socket.rb', line 102 def keepalive_active? @keepalive_active end |
#read(length, outbuf = nil) ⇒ Object
Reads length bytes from the socket
length - the number of bytes to read from the socket outbuf - an optional buffer to store the bytes in
Returns the bytes read if no outbuf is specified
287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 |
# File 'lib/riemann/client/tcp_socket.rb', line 287 def read(length, outbuf = nil) if outbuf outbuf.replace('') buf = outbuf else buf = String.new end while buf.length < length unless (rb = readpartial(length - buf.length)) break end buf << rb end buf end |
#readpartial(maxlen, outbuf = nil) ⇒ Object
Internal: Read up to a maxlen of data from the socket and store it in outbuf
maxlen - the maximum number of bytes to read from the socket outbuf - the buffer in which to store the bytes.
Returns the bytes read
312 313 314 315 316 317 318 319 320 |
# File 'lib/riemann/client/tcp_socket.rb', line 312 def readpartial(maxlen, outbuf = nil) socket.read_nonblock(maxlen, outbuf) rescue Errno::EWOULDBLOCK, Errno::EAGAIN, Errno::ECONNRESET, IO::WaitReadable unless wait_readable(read_timeout) raise Timeout, "Could not read from #{host}:#{port} in #{read_timeout} seconds" end retry end |
#socket ⇒ Object
Internal: Return the connected raw Socket.
If the socket is closed or non-existent it will create and connect again.
Returns a ::Socket
141 142 143 144 145 |
# File 'lib/riemann/client/tcp_socket.rb', line 141 def socket return @socket unless closed? @socket ||= connect end |
#socket_factory(type) ⇒ Object
Internal: Low level socket allocation and option configuration
Using the options from the initializer, a new ::Socket is created that is:
TCP, autoclosing on exit, nagle's algorithm is disabled and has
TCP Keepalive options set if keepalive is supported.
Returns a new ::Socket instance for
116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 |
# File 'lib/riemann/client/tcp_socket.rb', line 116 def socket_factory(type) sock = ::Socket.new(type, ::Socket::SOCK_STREAM, 0) # close file descriptors if we exec if Fcntl.constants.include?(:F_SETFD) && Fcntl.constants.include?(:FD_CLOEXEC) sock.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC) end # Disable Nagle's algorithm sock.setsockopt(::Socket::IPPROTO_TCP, ::Socket::TCP_NODELAY, 1) if using_keepalive? sock.setsockopt(::Socket::SOL_SOCKET, ::Socket::SO_KEEPALIVE, true) sock.setsockopt(::Socket::SOL_TCP, ::Socket::TCP_KEEPIDLE, keepalive_idle) sock.setsockopt(::Socket::SOL_TCP, ::Socket::TCP_KEEPINTVL, keepalive_interval) sock.setsockopt(::Socket::SOL_TCP, ::Socket::TCP_KEEPCNT, keepalive_count) end sock end |
#using_keepalive? ⇒ Boolean
Internal: say if we are using TCP Keep Alive or not
We will return true if the initialization options :keepalive_active is set to true, and if all the constants that are necessary to use TCP keep alive are defined.
It may be the case that on some operating systems that the constants are not defined, so in that case we do not want to attempt to use tcp keep alive if we are unable to do so in any case.
Returns true or false
271 272 273 274 275 276 277 278 279 |
# File 'lib/riemann/client/tcp_socket.rb', line 271 def using_keepalive? using = false if keepalive_active? using = %i[SOL_SOCKET SO_KEEPALIVE SOL_TCP TCP_KEEPIDLE TCP_KEEPINTVL TCP_KEEPCNT].all? do |c| ::Socket.const_defined? c end end using end |
#wait_readable(timeout = nil) ⇒ Object
355 356 357 |
# File 'lib/riemann/client/tcp_socket.rb', line 355 def wait_readable(timeout = nil) IO.select([@socket], nil, nil, timeout || read_timeout) end |
#wait_writable(timeout = nil) ⇒ Object
351 352 353 |
# File 'lib/riemann/client/tcp_socket.rb', line 351 def wait_writable(timeout = nil) IO.select(nil, [@socket], nil, timeout || write_timeout) end |
#write(buf) ⇒ Object
Internal: Write the given data to the socket
buf - the data to write to the socket.
Raises an error if it is unable to write the data to the socket within the write_timeout.
returns nothing
330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 |
# File 'lib/riemann/client/tcp_socket.rb', line 330 def write(buf) until buf.nil? || buf.empty? written = socket.write_nonblock(buf) buf = buf[written, buf.length] end rescue Errno::EWOULDBLOCK, Errno::EINTR, Errno::EAGAIN, Errno::ECONNRESET, IO::WaitWritable unless wait_writable(write_timeout) raise Timeout, "Could not write to #{host}:#{port} in #{write_timeout} seconds" end retry rescue IO::WaitReadable # Also rescued for SSL renegotiation in OpenSSL::SSL::SSLSocket according to # https://ruby-doc.org/core-2.7.1/IO.html#method-c-select unless wait_readable(read_timeout) raise Timeout, "Could not write to #{host}:#{port} in #{write_timeout} seconds" end retry end |