Class: Net::TCPClient

Inherits:
Object
  • Object
show all
Includes:
SemanticLogger::Loggable
Defined in:
lib/net/tcp_client/version.rb,
lib/net/tcp_client.rb,
lib/net/tcp_client/address.rb,
lib/net/tcp_client/exceptions.rb,
lib/net/tcp_client/tcp_client.rb,
lib/net/tcp_client/policy/base.rb,
lib/net/tcp_client/policy/custom.rb,
lib/net/tcp_client/policy/random.rb,
lib/net/tcp_client/policy/ordered.rb

Overview

Make Socket calls resilient by adding timeouts, retries and specific exception categories

TCP Client with:

  • Connection Timeouts Ability to timeout if a connect does not complete within a reasonable time For example, this can occur when the server is turned off without shutting down causing clients to hang creating new connections

  • Automatic retries on startup connection failure For example, the server is being restarted while the client is starting Gives the server a few seconds to restart to

  • Automatic retries on active connection failures If the server is restarted during

Connection and Read Timeouts are fully configurable

Raises Net::TCPClient::ConnectionTimeout when the connection timeout is exceeded Raises Net::TCPClient::ReadTimeout when the read timeout is exceeded Raises Net::TCPClient::ConnectionFailure when a network error occurs whilst reading or writing

Note: Only the following methods currently have auto-reconnect enabled:

* read
* write

Future:

  • Add auto-reconnect feature to sysread, syswrite, etc…

  • To be a drop-in replacement to TCPSocket should also need to implement the following TCPSocket instance methods: :addr, :peeraddr

Design Notes:

  • Does not inherit from Socket or TCP Socket because the socket instance has to be completely destroyed and recreated after a connection failure

Defined Under Namespace

Modules: Policy Classes: Address, ConnectionFailure, ConnectionTimeout, NonBlockingTimeout, ReadTimeout, WriteTimeout

Constant Summary collapse

VERSION =
'2.2.0'.freeze

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(server: nil, servers: nil, policy: :ordered, buffered: true, keepalive: true, connect_timeout: 10.0, read_timeout: 60.0, write_timeout: 60.0, connect_retry_count: 10, retry_count: 3, connect_retry_interval: 0.5, close_on_error: true, on_connect: nil, proxy_server: nil, ssl: nil) ⇒ TCPClient

Create a new TCP Client connection

Parameters:

:server [String]
  URL of the server to connect to with port number
  'localhost:2000'
  '192.168.1.10:80'

:servers [Array of String]
  Array of URL's of servers to connect to with port numbers
  ['server1:2000', 'server2:2000']

  The second server will only be attempted once the first server
  cannot be connected to or has timed out on connect
  A read failure or timeout will not result in switching to the second
  server, only a connection failure or during an automatic reconnect

:connect_timeout [Float]
  Time in seconds to timeout when trying to connect to the server
  A value of -1 will cause the connect wait time to be infinite
  Default: 10 seconds

:read_timeout [Float]
  Time in seconds to timeout on read
  Can be overridden by supplying a timeout in the read call
  Default: 60

:write_timeout [Float]
  Time in seconds to timeout on write
  Can be overridden by supplying a timeout in the write call
  Default: 60

:buffered [true|false]
  Whether to use Nagle's Buffering algorithm (http://en.wikipedia.org/wiki/Nagle's_algorithm)
  Recommend disabling for RPC style invocations where we don't want to wait for an
  ACK from the server before sending the last partial segment
  Buffering is recommended in a browser or file transfer style environment
  where multiple sends are expected during a single response.
  Also sets sync to true if buffered is false so that all data is sent immediately without
  internal buffering.
  Default: true

:keepalive [true|false]
  Makes the OS check connections even when not in use, so that failed connections fail immediately
  upon use instead of possibly taking considerable time to fail.
  Default: true

:connect_retry_count [Fixnum]
  Number of times to retry connecting when a connection fails
  Default: 10

:connect_retry_interval [Float]
  Number of seconds between connection retry attempts after the first failed attempt
  Default: 0.5

:retry_count [Fixnum]
  Number of times to retry when calling #retry_on_connection_failure
  This is independent of :connect_retry_count which still applies with
  connection failures. This retry controls upto how many times to retry the
  supplied block should a connection failure occur during the block
  Default: 3

:on_connect [Proc]
  Directly after a connection is established and before it is made available
  for use this Block is invoked.
  Typical Use Cases:
  - Initialize per connection session sequence numbers.
  - Pass authentication information to the server.
  - Perform a handshake with the server.

:policy [Symbol|Proc]
  Specify the policy to use when connecting to servers.
    :ordered
      Select a server in the order supplied in the array, with the first
      having the highest priority. The second server will only be connected
      to if the first server is unreachable
    :random
      Randomly select a server from the list every time a connection
      is established, including during automatic connection recovery.
    :ping_time
      FUTURE - Not implemented yet - Pull request anyone?
      The server with the lowest ping time will be tried first
    Proc:
      When a Proc is supplied, it will be called passing in the list
      of servers. The Proc must return one server name
        Example:
          :policy => Proc.new do |servers|
            servers.last
          end
    Default: :ordered

:close_on_error [True|False]
  To prevent the connection from going into an inconsistent state
  automatically close the connection if an error occurs
  This includes a Read Timeout
  Default: true

:proxy_server [String]
  The host name and port in the form of 'host_name:1234' to forward
  socket connections though.
  Default: nil ( none )

SSL Options
:ssl [true|false|Hash]
   true:  SSL is enabled using the SSL context defaults.
   false: SSL is not used.
   Hash:
     Keys from OpenSSL::SSL::SSLContext:
       ca_file, ca_path, cert, cert_store, ciphers, key, ssl_timeout, ssl_version
       verify_callback, verify_depth, verify_mode
     handshake_timeout: [Float]
       The number of seconds to timeout the SSL Handshake.
       Default: connect_timeout
   Default: false.
     See OpenSSL::SSL::SSLContext::DEFAULT_PARAMS for the defaults.

Example:

client = Net::TCPClient.new(
  server:                 'server:3300',
  connect_retry_interval: 0.1,
  connect_retry_count:    5
)

client.retry_on_connection_failure do
  client.send('Update the database')
end

# Read upto 20 characters from the server
response = client.read(20)

puts "Received: #{response}"
client.close

SSL Example:

client = Net::TCPClient.new(
  server:                 'server:3300',
  connect_retry_interval: 0.1,
  connect_retry_count:    5,
  ssl:                    true
)

SSL with options Example:

client = Net::TCPClient.new(
  server:                 'server:3300',
  connect_retry_interval: 0.1,
  connect_retry_count:    5,
  ssl:                    {
    verify_mode: OpenSSL::SSL::VERIFY_NONE
  }
)

Raises:

  • (ArgumentError)


249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
# File 'lib/net/tcp_client/tcp_client.rb', line 249

def initialize(server: nil, servers: nil,
  policy: :ordered, buffered: true, keepalive: true,
  connect_timeout: 10.0, read_timeout: 60.0, write_timeout: 60.0,
  connect_retry_count: 10, retry_count: 3, connect_retry_interval: 0.5, close_on_error: true,
  on_connect: nil, proxy_server: nil, ssl: nil
)
  @read_timeout           = read_timeout.to_f
  @write_timeout          = write_timeout.to_f
  @connect_timeout        = connect_timeout.to_f
  @buffered               = buffered
  @keepalive              = keepalive
  @connect_retry_count    = connect_retry_count
  @retry_count            = retry_count
  @connect_retry_interval = connect_retry_interval.to_f
  @on_connect             = on_connect
  @proxy_server           = proxy_server
  @policy                 = policy
  @close_on_error         = close_on_error
  if ssl
    @ssl                   = ssl == true ? {} : ssl
    @ssl_handshake_timeout = (@ssl.delete(:handshake_timeout) || @connect_timeout).to_f
  end
  @servers = [server] if server
  @servers = servers if servers

  raise(ArgumentError, 'Missing mandatory :server or :servers') unless @servers

  connect
end

Instance Attribute Details

#addressObject (readonly)

Returns the value of attribute address.



43
44
45
# File 'lib/net/tcp_client/tcp_client.rb', line 43

def address
  @address
end

#bufferedObject

Returns the value of attribute buffered.



40
41
42
# File 'lib/net/tcp_client/tcp_client.rb', line 40

def buffered
  @buffered
end

#close_on_errorObject

Returns the value of attribute close_on_error.



40
41
42
# File 'lib/net/tcp_client/tcp_client.rb', line 40

def close_on_error
  @close_on_error
end

#connect_retry_countObject

Returns the value of attribute connect_retry_count.



40
41
42
# File 'lib/net/tcp_client/tcp_client.rb', line 40

def connect_retry_count
  @connect_retry_count
end

#connect_retry_intervalObject

Returns the value of attribute connect_retry_interval.



40
41
42
# File 'lib/net/tcp_client/tcp_client.rb', line 40

def connect_retry_interval
  @connect_retry_interval
end

#connect_timeoutObject

Returns the value of attribute connect_timeout.



40
41
42
# File 'lib/net/tcp_client/tcp_client.rb', line 40

def connect_timeout
  @connect_timeout
end

#keepaliveObject

Returns the value of attribute keepalive.



40
41
42
# File 'lib/net/tcp_client/tcp_client.rb', line 40

def keepalive
  @keepalive
end

#policyObject

Returns the value of attribute policy.



40
41
42
# File 'lib/net/tcp_client/tcp_client.rb', line 40

def policy
  @policy
end

#proxy_serverObject

Returns the value of attribute proxy_server.



40
41
42
# File 'lib/net/tcp_client/tcp_client.rb', line 40

def proxy_server
  @proxy_server
end

#read_timeoutObject

Returns the value of attribute read_timeout.



40
41
42
# File 'lib/net/tcp_client/tcp_client.rb', line 40

def read_timeout
  @read_timeout
end

#retry_countObject

Returns the value of attribute retry_count.



40
41
42
# File 'lib/net/tcp_client/tcp_client.rb', line 40

def retry_count
  @retry_count
end

#serversObject (readonly)

Returns the value of attribute servers.



43
44
45
# File 'lib/net/tcp_client/tcp_client.rb', line 43

def servers
  @servers
end

#socketObject (readonly)

Returns the value of attribute socket.



43
44
45
# File 'lib/net/tcp_client/tcp_client.rb', line 43

def socket
  @socket
end

#sslObject

Returns the value of attribute ssl.



40
41
42
# File 'lib/net/tcp_client/tcp_client.rb', line 40

def ssl
  @ssl
end

#ssl_handshake_timeoutObject (readonly)

Returns the value of attribute ssl_handshake_timeout.



43
44
45
# File 'lib/net/tcp_client/tcp_client.rb', line 43

def ssl_handshake_timeout
  @ssl_handshake_timeout
end

#user_dataObject

Supports embedding user supplied data along with this connection such as sequence number and other connection specific information Not used or modified by TCPClient



48
49
50
# File 'lib/net/tcp_client/tcp_client.rb', line 48

def user_data
  @user_data
end

#write_timeoutObject

Returns the value of attribute write_timeout.



40
41
42
# File 'lib/net/tcp_client/tcp_client.rb', line 40

def write_timeout
  @write_timeout
end

Class Method Details

.connect(params = {}) ⇒ Object

Create a connection, call the supplied block and close the connection on completion of the block

See #initialize for the list of parameters

Example

Net::TCPClient.connect(
  server:                 'server:3300',
  connect_retry_interval: 0.1,
  connect_retry_count:    5
) do |client|
  client.retry_on_connection_failure do
    client.send('Update the database')
  end
  response = client.read(20)
  puts "Received: #{response}"
end


90
91
92
93
94
95
96
97
# File 'lib/net/tcp_client/tcp_client.rb', line 90

def self.connect(params={})
  begin
    connection = self.new(params)
    yield(connection)
  ensure
    connection.close if connection
  end
end

.reconnect_on_errorsObject

Return the array of errors that will result in an automatic connection retry

To add any additional errors to the standard list:
  Net::TCPClient.reconnect_on_errors << Errno::EPROTO


68
69
70
# File 'lib/net/tcp_client/tcp_client.rb', line 68

def self.reconnect_on_errors
  @reconnect_on_errors
end

Instance Method Details

#alive?Boolean

Returns whether the connection to the server is alive

It is useful to call this method before making a call to the server that would change data on the server

Note: This method is only useful if the server closed the connection or

if a previous connection failure occurred.
If the server is hard killed this will still return true until one
or more writes are attempted

Note: In testing the overhead of this call is rather low, with the ability to make about 120,000 calls per second against an active connection. I.e. About 8.3 micro seconds per call

Returns:

  • (Boolean)


521
522
523
524
525
526
527
528
529
530
531
# File 'lib/net/tcp_client/tcp_client.rb', line 521

def alive?
  return false if socket.nil? || closed?

  if IO.select([socket], nil, nil, 0)
    !socket.eof? rescue false
  else
    true
  end
rescue IOError
  false
end

#closeObject

Close the socket only if it is not already closed

Logs a warning if an error occurs trying to close the socket



485
486
487
488
489
490
491
492
493
# File 'lib/net/tcp_client/tcp_client.rb', line 485

def close
  socket.close if socket && !socket.closed?
  @socket  = nil
  @address = nil
  true
rescue IOError => exception
  logger.warn "IOError when attempting to close socket: #{exception.class}: #{exception.message}" if respond_to?(:logger)
  false
end

#closed?Boolean

Returns:

  • (Boolean)


500
501
502
# File 'lib/net/tcp_client/tcp_client.rb', line 500

def closed?
  socket.nil? || socket.closed?
end

#connectObject

Connect to the TCP server

Raises Net::TCPClient::ConnectionTimeout when the time taken to create a connection

exceeds the :connect_timeout

Raises Net::TCPClient::ConnectionFailure whenever Socket raises an error such as Error::EACCESS etc, see Socket#connect for more information

Error handling is implemented as follows:

  1. TCP Socket Connect failure: Cannot reach server Server is being restarted, or is not running Retry 50 times every 100ms before raising a Net::TCPClient::ConnectionFailure

    • Means all calls to #connect will take at least 5 seconds before failing if the server is not running

    • Allows hot restart of server process if it restarts within 5 seconds

  2. TCP Socket Connect timeout: Timed out after 5 seconds trying to connect to the server Usually means server is busy or the remote server disappeared off the network recently No retry, just raise a Net::TCPClient::ConnectionTimeout

Note: When multiple servers are supplied it will only try to connect to

the subsequent servers once the retry count has been exceeded

Note: Calling #connect on an open connection will close the current connection

and create a new connection


303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
# File 'lib/net/tcp_client/tcp_client.rb', line 303

def connect
  start_time = Time.now
  retries    = 0
  close

  # Number of times to try
  begin
    connect_to_server(servers, policy)
    logger.info(message: "Connected to #{address}", duration: (Time.now - start_time) * 1000) if respond_to?(:logger)
  rescue ConnectionFailure, ConnectionTimeout => exception
    cause = exception.is_a?(ConnectionTimeout) ? exception : exception.cause
    # Retry-able?
    if self.class.reconnect_on_errors.include?(cause.class) && (retries < connect_retry_count.to_i)
      retries += 1
      logger.warn "#connect Failed to connect to any of #{servers.join(',')}. Sleeping:#{connect_retry_interval}s. Retry: #{retries}" if respond_to?(:logger)
      sleep(connect_retry_interval)
      retry
    else
      message = "#connect Failed to connect to any of #{servers.join(',')} after #{retries} retries. #{exception.class}: #{exception.message}"
      logger.benchmark_error(message, exception: exception, duration: (Time.now - start_time)) if respond_to?(:logger)
      raise ConnectionFailure.new(message, address.to_s, cause)
    end
  end
end

#eof?Boolean

Returns:

  • (Boolean)


504
505
506
# File 'lib/net/tcp_client/tcp_client.rb', line 504

def eof?
  socket.nil? || socket.eof?
end

#flushObject



495
496
497
498
# File 'lib/net/tcp_client/tcp_client.rb', line 495

def flush
  return unless socket
  respond_to?(:logger) ? logger.benchmark_debug('#flush') { socket.flush } : socket.flush
end

#read(length, buffer = nil, timeout = read_timeout) ⇒ Object

Returns a response from the server

Raises Net::TCPClient::ConnectionTimeout when the time taken to create a connection

exceeds the :connect_timeout
Connection is closed

Raises Net::TCPClient::ConnectionFailure whenever Socket raises an error such as

Error::EACCESS etc, see Socket#connect for more information
Connection is closed

Raises Net::TCPClient::ReadTimeout if the timeout has been exceeded waiting for the

requested number of bytes from the server
Partial data will not be returned
Connection is _not_ closed and #read can be called again later
to read the response from the connection

Parameters

 length [Fixnum]
   The number of bytes to return
   #read will not return until 'length' bytes have been received from
   the server

 buffer [String]
  Optional buffer into which to write the data that is read.

 timeout [Float]
   Optional: Override the default read timeout for this read
   Number of seconds before raising Net::TCPClient::ReadTimeout when no data has
   been returned
   A value of -1 will wait forever for a response on the socket
   Default: :read_timeout supplied to #initialize

Note: After a Net::TCPClient::ReadTimeout #read can be called again on
      the same socket to read the response later.
      If the application no longer wants the connection after a
      Net::TCPClient::ReadTimeout, then the #close method _must_ be called
      before calling _connect_ or _retry_on_connection_failure_ to create
      a new connection


402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
# File 'lib/net/tcp_client/tcp_client.rb', line 402

def read(length, buffer = nil, timeout = read_timeout)
  if respond_to?(:logger)
    payload = {bytes: length, timeout: timeout}
    logger.benchmark_debug('#read', payload: payload) do
      data           = socket_read(length, buffer, timeout)
      # With trace level also log the received data
      payload[:data] = data if logger.trace?
      data
    end
  else
    socket_read(length, buffer, timeout)
  end
rescue Exception => exc
  close if close_on_error
  raise exc
end

#retry_on_connection_failureObject

Send and/or receive data with automatic retry on connection failure

On a connection failure, it will create a new connection and retry the block. Returns immediately on exception Net::TCPClient::ReadTimeout The connection is always closed on Net::TCPClient::ConnectionFailure regardless of close_on_error

  1. Example of a resilient readonly request:

    When reading data from a server that does not change state on the server Wrap both the send and the read with #retry_on_connection_failure since it is safe to send the same data twice to the server

    # Since the send can be sent many times it is safe to also put the receive # inside the retry block value = client.retry_on_connection_failure do

    client.send("GETVALUE:count\n")
    client.read(20).strip.to_i
    

    end

  2. Example of a resilient request that modifies data on the server:

    When changing state on the server, for example when updating a value Wrap only the send with #retry_on_connection_failure The read must be outside the #retry_on_connection_failure since we must not retry the send if the connection fails during the #read

    value = 45 # Only the send is within the retry block since we cannot re-send once # the send was successful since the server may have made the change client.retry_on_connection_failure do

    client.send("SETVALUE:#{count}\n")
    

    end # Server returns “SAVED” if the call was successful result = client.read(20).strip

Error handling is implemented as follows:

If a network failure occurs during the block invocation the block
will be called again with a new connection to the server.
It will only be retried up to 3 times
The re-connect will independently retry and timeout using all the
rules of #connect


460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
# File 'lib/net/tcp_client/tcp_client.rb', line 460

def retry_on_connection_failure
  retries = 0
  begin
    connect if closed?
    yield(self)
  rescue ConnectionFailure => exception
    exc_str = exception.cause ? "#{exception.cause.class}: #{exception.cause.message}" : exception.message
    # Re-raise exceptions that should not be retried
    if !self.class.reconnect_on_errors.include?(exception.cause.class)
      logger.info "#retry_on_connection_failure not configured to retry: #{exc_str}" if respond_to?(:logger)
      raise exception
    elsif retries < @retry_count
      retries += 1
      logger.warn "#retry_on_connection_failure retry #{retries} due to #{exception.class}: #{exception.message}" if respond_to?(:logger)
      connect
      retry
    end
    logger.error "#retry_on_connection_failure Connection failure: #{exception.class}: #{exception.message}. Giving up after #{retries} retries" if respond_to?(:logger)
    raise ConnectionFailure.new("After #{retries} retries to host '#{server}': #{exc_str}", server, exception.cause)
  end
end

#setsockopt(*args) ⇒ Object



533
534
535
# File 'lib/net/tcp_client/tcp_client.rb', line 533

def setsockopt(*args)
  socket.nil? || socket.setsockopt(*args)
end

#write(data, timeout = write_timeout) ⇒ Object

Send data to the server

Use #with_retry to add resilience to the #send method

Raises Net::TCPClient::ConnectionFailure whenever the send fails

For a description of the errors, see Socket#write

Parameters

 timeout [Float]
   Optional: Override the default write timeout for this write
   Number of seconds before raising Net::TCPClient::WriteTimeout when no data has
   been written.
   A value of -1 will wait forever
   Default: :write_timeout supplied to #initialize

Note: After a Net::TCPClient::ReadTimeout #read can be called again on
      the same socket to read the response later.
      If the application no longer wants the connection after a
      Net::TCPClient::ReadTimeout, then the #close method _must_ be called
      before calling _connect_ or _retry_on_connection_failure_ to create
      a new connection


349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
# File 'lib/net/tcp_client/tcp_client.rb', line 349

def write(data, timeout = write_timeout)
  data = data.to_s
  if respond_to?(:logger)
    payload        = {timeout: timeout}
    # With trace level also log the sent data
    payload[:data] = data if logger.trace?
    logger.benchmark_debug('#write', payload: payload) do
      payload[:bytes] = socket_write(data, timeout)
    end
  else
    socket_write(data, timeout)
  end
rescue Exception => exc
  close if close_on_error
  raise exc
end