Class: Net::TCPClient
- Inherits:
-
Object
- Object
- Net::TCPClient
- Defined in:
- lib/net/tcp_client/version.rb,
lib/net/tcp_client/logging.rb,
lib/net/tcp_client/exceptions.rb,
lib/net/tcp_client/tcp_client.rb
Overview
Make Socket calls resilient by adding timeouts, retries and specific exception categories
TCP Client with:
-
Connection Timeouts Ability to timeout if a connect does not complete within a reasonable time For example, this can occur when the server is turned off without shutting down causing clients to hang creating new connections
-
Automatic retries on startup connection failure For example, the server is being restarted while the client is starting Gives the server a few seconds to restart to
-
Automatic retries on active connection failures If the server is restarted during
Connection and Read Timeouts are fully configurable
Raises Net::TCPClient::ConnectionTimeout when the connection timeout is exceeded Raises Net::TCPClient::ReadTimeout when the read timeout is exceeded Raises Net::TCPClient::ConnectionFailure when a network error occurs whilst reading or writing
Note: Only the following methods currently have auto-reconnect enabled:
* read
* write
Future:
-
Add auto-reconnect feature to sysread, syswrite, etc…
-
To be a drop-in replacement to TCPSocket should also need to implement the following TCPSocket instance methods: :addr, :peeraddr
Design Notes:
-
Does not inherit from Socket or TCP Socket because the socket instance has to be completely destroyed and recreated after a connection failure
Defined Under Namespace
Modules: Logging Classes: ConnectionFailure, ConnectionTimeout, ReadTimeout
Constant Summary collapse
- VERSION =
"1.0.0"- @@reconnect_on_errors =
[ Errno::ECONNABORTED, Errno::ECONNREFUSED, Errno::ECONNRESET, Errno::EHOSTUNREACH, Errno::EIO, Errno::ENETDOWN, Errno::ENETRESET, Errno::EPIPE, Errno::ETIMEDOUT, EOFError, ]
Instance Attribute Summary collapse
-
#buffered ⇒ Object
readonly
Returns [TrueClass|FalseClass] Whether send buffering is enabled for this connection.
-
#close_on_error ⇒ Object
Returns the value of attribute close_on_error.
-
#connect_retry_count ⇒ Object
Returns the value of attribute connect_retry_count.
-
#connect_retry_interval ⇒ Object
Returns the value of attribute connect_retry_interval.
-
#connect_timeout ⇒ Object
Returns the value of attribute connect_timeout.
-
#logger ⇒ Object
readonly
Returns the logger being used by the TCPClient instance.
-
#read_timeout ⇒ Object
Returns the value of attribute read_timeout.
-
#retry_count ⇒ Object
Returns the value of attribute retry_count.
-
#server ⇒ Object
readonly
Returns [String] Name of the server connected to including the port number.
-
#server_selector ⇒ Object
Returns the value of attribute server_selector.
-
#user_data ⇒ Object
Supports embedding user supplied data along with this connection such as sequence number and other connection specific information.
Class Method Summary collapse
-
.connect(params = {}) ⇒ Object
Create a connection, call the supplied block and close the connection on completion of the block.
-
.reconnect_on_errors ⇒ Object
Return the array of errors that will result in an automatic connection retry To add any additional errors to the standard list: Net::TCPClient.reconnect_on_errors << Errno::EPROTO.
Instance Method Summary collapse
-
#alive? ⇒ Boolean
Returns whether the connection to the server is alive.
-
#close ⇒ Object
Close the socket only if it is not already closed.
-
#closed? ⇒ Boolean
Returns whether the socket is closed.
-
#connect ⇒ Object
Connect to the TCP server.
-
#initialize(parameters = {}) ⇒ TCPClient
constructor
Create a new TCP Client connection.
-
#read(length, buffer = nil, timeout = nil) ⇒ Object
Returns a response from the server.
-
#retry_on_connection_failure ⇒ Object
Send and/or receive data with automatic retry on connection failure.
-
#setsockopt(level, optname, optval) ⇒ Object
See: Socket#setsockopt.
-
#write(data) ⇒ Object
Send data to the server.
Constructor Details
#initialize(parameters = {}) ⇒ TCPClient
Create a new TCP Client connection
Parameters:
:server [String]
URL of the server to connect to with port number
'localhost:2000'
:servers [Array of String]
Array of URL's of servers to connect to with port numbers
['server1:2000', 'server2:2000']
The second server will only be attempted once the first server
cannot be connected to or has timed out on connect
A read failure or timeout will not result in switching to the second
server, only a connection failure or during an automatic reconnect
:read_timeout [Float]
Time in seconds to timeout on read
Can be overridden by supplying a timeout in the read call
Default: 60
:connect_timeout [Float]
Time in seconds to timeout when trying to connect to the server
A value of -1 will cause the connect wait time to be infinite
Default: Half of the :read_timeout ( 30 seconds )
:logger [Logger]
Set the logger to which to write log messages to
Note: Additional methods will be mixed into this logger to make it
compatible with the SematicLogger extensions if it is not already
a SemanticLogger logger instance
:log_level [Symbol]
Set the logging level for the TCPClient
Any valid SemanticLogger log level:
:trace, :debug, :info, :warn, :error, :fatal
Default: SemanticLogger.default_level
:buffered [Boolean]
Whether to use Nagle's Buffering algorithm (http://en.wikipedia.org/wiki/Nagle's_algorithm)
Recommend disabling for RPC style invocations where we don't want to wait for an
ACK from the server before sending the last partial segment
Buffering is recommended in a browser or file transfer style environment
where multiple sends are expected during a single response
Default: true
:connect_retry_count [Fixnum]
Number of times to retry connecting when a connection fails
Default: 10
:connect_retry_interval [Float]
Number of seconds between connection retry attempts after the first failed attempt
Default: 0.5
:retry_count [Fixnum]
Number of times to retry when calling #retry_on_connection_failure
This is independent of :connect_retry_count which still applies with
connection failures. This retry controls upto how many times to retry the
supplied block should a connection failure occurr during the block
Default: 3
:on_connect [Proc]
Directly after a connection is established and before it is made available
for use this Block is invoked.
Typical Use Cases:
- Initialize per connection session sequence numbers
- Pass any authentication information to the server
- Perform a handshake with the server
:server_selector [Symbol|Proc]
When multiple servers are supplied using :servers, this option will
determine which server is selected from the list
:ordered
Select a server in the order supplied in the array, with the first
having the highest priority. The second server will only be connected
to if the first server is unreachable
:random
Randomly select a server from the list every time a connection
is established, including during automatic connection recovery.
:nearest
FUTURE - Not implemented yet
The server with an IP address that most closely matches the
local ip address will be attempted first
This will result in connections to servers on the localhost
first prior to looking at remote servers
:ping_time
FUTURE - Not implemented yet
The server with the lowest ping time will be selected first
Proc:
When a Proc is supplied, it will be called passing in the list
of servers. The Proc must return one server name
Example:
:server_selector => Proc.new do |servers|
servers.last
end
Default: :ordered
:close_on_error [True|False]
To prevent the connection from going into an inconsistent state
automatically close the connection if an error occurs
This includes a Read Timeout
Default: true
Example
client = Net::TCPClient.new(
server: 'server:3300',
connect_retry_interval: 0.1,
connect_retry_count: 5
)
client.retry_on_connection_failure do
client.send('Update the database')
end
# Read upto 20 characters from the server
response = client.read(20)
puts "Received: #{response}"
client.close
225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 |
# File 'lib/net/tcp_client/tcp_client.rb', line 225 def initialize(parameters={}) params = parameters.dup @read_timeout = (params.delete(:read_timeout) || 60.0).to_f @connect_timeout = (params.delete(:connect_timeout) || (@read_timeout/2)).to_f buffered = params.delete(:buffered) @buffered = buffered.nil? ? true : buffered @connect_retry_count = params.delete(:connect_retry_count) || 10 @retry_count = params.delete(:retry_count) || 3 @connect_retry_interval = (params.delete(:connect_retry_interval) || 0.5).to_f @on_connect = params.delete(:on_connect) @server_selector = params.delete(:server_selector) || :ordered @close_on_error = params.delete(:close_on_error) @close_on_error = true if @close_on_error.nil? @logger = params.delete(:logger) unless @servers = params.delete(:servers) raise "Missing mandatory :server or :servers" unless server = params.delete(:server) @servers = [ server ] end # If a logger is supplied, add the SemanticLogger extensions @logger = Logging.new_logger(logger, "#{self.class.name} #{@servers.inspect}", params.delete(:log_level)) params.each_pair {|k,v| logger.warn "Ignoring unknown option #{k} = #{v}"} # Connect to the Server connect end |
Instance Attribute Details
#buffered ⇒ Object (readonly)
Returns [TrueClass|FalseClass] Whether send buffering is enabled for this connection
54 55 56 |
# File 'lib/net/tcp_client/tcp_client.rb', line 54 def buffered @buffered end |
#close_on_error ⇒ Object
Returns the value of attribute close_on_error.
50 51 52 |
# File 'lib/net/tcp_client/tcp_client.rb', line 50 def close_on_error @close_on_error end |
#connect_retry_count ⇒ Object
Returns the value of attribute connect_retry_count.
50 51 52 |
# File 'lib/net/tcp_client/tcp_client.rb', line 50 def connect_retry_count @connect_retry_count end |
#connect_retry_interval ⇒ Object
Returns the value of attribute connect_retry_interval.
50 51 52 |
# File 'lib/net/tcp_client/tcp_client.rb', line 50 def connect_retry_interval @connect_retry_interval end |
#connect_timeout ⇒ Object
Returns the value of attribute connect_timeout.
50 51 52 |
# File 'lib/net/tcp_client/tcp_client.rb', line 50 def connect_timeout @connect_timeout end |
#logger ⇒ Object (readonly)
Returns the logger being used by the TCPClient instance
57 58 59 |
# File 'lib/net/tcp_client/tcp_client.rb', line 57 def logger @logger end |
#read_timeout ⇒ Object
Returns the value of attribute read_timeout.
50 51 52 |
# File 'lib/net/tcp_client/tcp_client.rb', line 50 def read_timeout @read_timeout end |
#retry_count ⇒ Object
Returns the value of attribute retry_count.
50 51 52 |
# File 'lib/net/tcp_client/tcp_client.rb', line 50 def retry_count @retry_count end |
#server ⇒ Object (readonly)
Returns [String] Name of the server connected to including the port number
Example:
localhost:2000
48 49 50 |
# File 'lib/net/tcp_client/tcp_client.rb', line 48 def server @server end |
#server_selector ⇒ Object
Returns the value of attribute server_selector.
50 51 52 |
# File 'lib/net/tcp_client/tcp_client.rb', line 50 def server_selector @server_selector end |
#user_data ⇒ Object
Supports embedding user supplied data along with this connection such as sequence number and other connection specific information
42 43 44 |
# File 'lib/net/tcp_client/tcp_client.rb', line 42 def user_data @user_data end |
Class Method Details
.connect(params = {}) ⇒ Object
Create a connection, call the supplied block and close the connection on completion of the block
See #initialize for the list of parameters
Example
Net::TCPClient.connect(
server: 'server:3300',
connect_retry_interval: 0.1,
connect_retry_count: 5
) do |client|
client.retry_on_connection_failure do
client.send('Update the database')
end
response = client.read(20)
puts "Received: #{response}"
end
97 98 99 100 101 102 103 104 |
# File 'lib/net/tcp_client/tcp_client.rb', line 97 def self.connect(params={}) begin connection = self.new(params) yield(connection) ensure connection.close if connection end end |
.reconnect_on_errors ⇒ Object
75 76 77 |
# File 'lib/net/tcp_client/tcp_client.rb', line 75 def self.reconnect_on_errors @@reconnect_on_errors end |
Instance Method Details
#alive? ⇒ Boolean
Returns whether the connection to the server is alive
It is useful to call this method before making a call to the server that would change data on the server
Note: This method is only useful if the server closed the connection or
if a previous connection failure occurred.
If the server is hard killed this will still return true until one
or more writes are attempted
Note: In testing the overhead of this call is rather low, with the ability to make about 120,000 calls per second against an active connection. I.e. About 8.3 micro seconds per call
530 531 532 533 534 535 536 537 538 539 540 |
# File 'lib/net/tcp_client/tcp_client.rb', line 530 def alive? return false if @socket.closed? if IO.select([@socket], nil, nil, 0) !@socket.eof? rescue false else true end rescue IOError false end |
#close ⇒ Object
Close the socket only if it is not already closed
Logs a warning if an error occurs trying to close the socket
506 507 508 509 510 |
# File 'lib/net/tcp_client/tcp_client.rb', line 506 def close @socket.close unless @socket.closed? rescue IOError => exception logger.warn "IOError when attempting to close socket: #{exception.class}: #{exception.}" end |
#closed? ⇒ Boolean
Returns whether the socket is closed
513 514 515 |
# File 'lib/net/tcp_client/tcp_client.rb', line 513 def closed? @socket.closed? end |
#connect ⇒ Object
Connect to the TCP server
Raises Net::TCPClient::ConnectionTimeout when the time taken to create a connection
exceeds the :connect_timeout
Raises Net::TCPClient::ConnectionFailure whenever Socket raises an error such as Error::EACCESS etc, see Socket#connect for more information
Error handling is implemented as follows:
-
TCP Socket Connect failure: Cannot reach server Server is being restarted, or is not running Retry 50 times every 100ms before raising a Net::TCPClient::ConnectionFailure
-
Means all calls to #connect will take at least 5 seconds before failing if the server is not running
-
Allows hot restart of server process if it restarts within 5 seconds
-
-
TCP Socket Connect timeout: Timed out after 5 seconds trying to connect to the server Usually means server is busy or the remote server disappeared off the network recently No retry, just raise a Net::TCPClient::ConnectionTimeout
Note: When multiple servers are supplied it will only try to connect to
the subsequent servers once the retry count has been exceeded
Note: Calling #connect on an open connection will close the current connection
and create a new connection
278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 |
# File 'lib/net/tcp_client/tcp_client.rb', line 278 def connect @socket.close if @socket && !@socket.closed? if @servers.size > 1 case when @server_selector.is_a?(Proc) connect_to_server(@server_selector.call(@servers)) when @server_selector == :ordered # Try each server in sequence exception = nil @servers.find do |server| begin connect_to_server(server) exception = nil true rescue Net::TCPClient::ConnectionFailure => exc exception = exc false end end # Raise Exception once it has also failed to connect to all servers raise(exception) if exception when @server_selector == :random # Pick each server randomly, trying each server until one can be connected to # If no server can be connected to a Net::TCPClient::ConnectionFailure is raised servers_to_try = @servers.uniq exception = nil servers_to_try.size.times do |i| server = servers_to_try[rand(servers_to_try.size)] servers_to_try.delete(server) begin connect_to_server(server) exception = nil rescue Net::TCPClient::ConnectionFailure => exc exception = exc end end # Raise Exception once it has also failed to connect to all servers raise(exception) if exception else raise ArgumentError.new("Invalid or unknown value for parameter :server_selector => #{@server_selector}") end else connect_to_server(@servers.first) end # Invoke user supplied Block every time a new connection has been established @on_connect.call(self) if @on_connect true end |
#read(length, buffer = nil, timeout = nil) ⇒ Object
Returns a response from the server
Raises Net::TCPClient::ConnectionTimeout when the time taken to create a connection
exceeds the :connect_timeout
Connection is closed
Raises Net::TCPClient::ConnectionFailure whenever Socket raises an error such as
Error::EACCESS etc, see Socket#connect for more information
Connection is closed
Raises Net::TCPClient::ReadTimeout if the timeout has been exceeded waiting for the
requested number of bytes from the server
Partial data will not be returned
Connection is _not_ closed and #read can be called again later
to read the respnse from the connection
Parameters
length [Fixnum]
The number of bytes to return
#read will not return unitl 'length' bytes have been received from
the server
timeout [Float]
Optional: Override the default read timeout for this read
Number of seconds before raising Net::TCPClient::ReadTimeout when no data has
been returned
A value of -1 will wait forever for a response on the socket
Default: :read_timeout supplied to #initialize
Note: After a ResilientSocket::Net::TCPClient::ReadTimeout #read can be called again on
the same socket to read the response later.
If the application no longers want the connection after a
Net::TCPClient::ReadTimeout, then the #close method _must_ be called
before calling _connect_ or _retry_on_connection_failure_ to create
a new connection
391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 |
# File 'lib/net/tcp_client/tcp_client.rb', line 391 def read(length, buffer=nil, timeout=nil) result = nil logger.benchmark_debug("#read <== read #{length} bytes") do if timeout != -1 # Block on data to read for @read_timeout seconds ready = begin ready = IO.select([@socket], nil, [@socket], timeout || @read_timeout) rescue IOError => exception logger.warn "#read Connection failure while waiting for data: #{exception.class}: #{exception.}" close if close_on_error raise Net::TCPClient::ConnectionFailure.new("#{exception.class}: #{exception.}", @server, exception) rescue Exception # Close the connection on any other exception since the connection # will now be in an inconsistent state close if close_on_error raise end unless ready close if close_on_error logger.warn "#read Timeout waiting for server to reply" raise Net::TCPClient::ReadTimeout.new("Timedout after #{timeout || @read_timeout} seconds trying to read from #{@server}") end end # Read data from socket begin result = buffer.nil? ? @socket.read(length) : @socket.read(length, buffer) logger.trace("#read <== received", result.inspect) # EOF before all the data was returned if result.nil? || (result.length < length) close if close_on_error logger.warn "#read server closed the connection before #{length} bytes were returned" raise Net::TCPClient::ConnectionFailure.new("Connection lost while reading data", @server, EOFError.new("end of file reached")) end rescue SystemCallError, IOError => exception close if close_on_error logger.warn "#read Connection failure while reading data: #{exception.class}: #{exception.}" raise Net::TCPClient::ConnectionFailure.new("#{exception.class}: #{exception.}", @server, exception) rescue Exception # Close the connection on any other exception since the connection # will now be in an inconsistent state close if close_on_error raise end end result end |
#retry_on_connection_failure ⇒ Object
Send and/or receive data with automatic retry on connection failure
On a connection failure, it will create a new connection and retry the block. Returns immediately on exception Net::TCPClient::ReadTimeout The connection is always closed on Net::TCPClient::ConnectionFailure regardless of close_on_error
-
Example of a resilient readonly request:
When reading data from a server that does not change state on the server Wrap both the send and the read with #retry_on_connection_failure since it is safe to send the same data twice to the server
# Since the send can be sent many times it is safe to also put the receive # inside the retry block value = client.retry_on_connection_failure do
client.send("GETVALUE:count\n") client.read(20).strip.to_iend
-
Example of a resilient request that modifies data on the server:
When changing state on the server, for example when updating a value Wrap only the send with #retry_on_connection_failure The read must be outside the #retry_on_connection_failure since we must not retry the send if the connection fails during the #read
value = 45 # Only the send is within the retry block since we cannot re-send once # the send was successful since the server may have made the change client.retry_on_connection_failure do
client.send("SETVALUE:#{count}\n")end # Server returns “SAVED” if the call was successfull result = client.read(20).strip
Error handling is implemented as follows:
If a network failure occurrs during the block invocation the block
will be called again with a new connection to the server.
It will only be retried up to 3 times
The re-connect will independently retry and timeout using all the
rules of #connect
481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 |
# File 'lib/net/tcp_client/tcp_client.rb', line 481 def retry_on_connection_failure retries = 0 begin connect if closed? yield(self) rescue Net::TCPClient::ConnectionFailure => exception exc_str = exception.cause ? "#{exception.cause.class}: #{exception.cause.}" : exception. # Re-raise exceptions that should not be retried if !self.class.reconnect_on_errors.include?(exception.cause.class) logger.warn "#retry_on_connection_failure not configured to retry: #{exc_str}" raise exception elsif retries < @retry_count retries += 1 logger.warn "#retry_on_connection_failure retry #{retries} due to #{exception.class}: #{exception.}" connect retry end logger.error "#retry_on_connection_failure Connection failure: #{exception.class}: #{exception.}. Giving up after #{retries} retries" raise Net::TCPClient::ConnectionFailure.new("After #{retries} retries to host '#{server}': #{exc_str}", @server, exception.cause) end end |
#setsockopt(level, optname, optval) ⇒ Object
See: Socket#setsockopt
543 544 545 |
# File 'lib/net/tcp_client/tcp_client.rb', line 543 def setsockopt(level, optname, optval) @socket.setsockopt(level, optname, optval) end |
#write(data) ⇒ Object
Send data to the server
Use #with_retry to add resilience to the #send method
Raises Net::TCPClient::ConnectionFailure whenever the send fails
For a description of the errors, see Socket#write
338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 |
# File 'lib/net/tcp_client/tcp_client.rb', line 338 def write(data) logger.trace("#write ==> sending", data) stats = {} logger.benchmark_debug("#write ==> complete", stats) do begin stats[:bytes_sent] = @socket.write(data) rescue SystemCallError => exception logger.warn "#write Connection failure: #{exception.class}: #{exception.}" close if close_on_error raise Net::TCPClient::ConnectionFailure.new("Send Connection failure: #{exception.class}: #{exception.}", @server, exception) rescue Exception # Close the connection on any other exception since the connection # will now be in an inconsistent state close if close_on_error raise end end end |