Class: RubySkynet::Connection
- Inherits:
-
Object
- Object
- RubySkynet::Connection
- Includes:
- SemanticLogger::Loggable, SyncAttr
- Defined in:
- lib/ruby_skynet/connection.rb
Instance Attribute Summary collapse
-
#socket ⇒ Object
readonly
Returns the underlying socket being used by a Connection instance.
Class Method Summary collapse
-
.with_connection(server, params = {}, &block) ⇒ Object
Execute the supplied block with a connection from the pool.
Instance Method Summary collapse
- #close ⇒ Object
-
#initialize(servers, params = {}) ⇒ Connection
constructor
Returns a new RubySkynet connection to the server.
-
#rpc_call(request_id, skynet_name, method_name, parameters, idempotent = false) ⇒ Object
Performs a synchronous call to a Skynet server.
Constructor Details
#initialize(servers, params = {}) ⇒ Connection
Returns a new RubySkynet connection to the server
Parameters:
:read_timeout [Float]
Time in seconds to timeout on read
Can be overridden by supplying a timeout in the read call
Default: 60
:connect_timeout [Float]
Time in seconds to timeout when trying to connect to the server
Default: Half of the :read_timeout ( 30 seconds )
:connect_retry_count [Fixnum]
Number of times to retry connecting when a connection fails
Default: 10
:connect_retry_interval [Float]
Number of seconds between connection retry attempts after the first failed attempt
Default: 0.5
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 |
# File 'lib/ruby_skynet/connection.rb', line 38 def initialize(servers, params = {}) params = params.dup # User configurable options params[:read_timeout] ||= 60 params[:connect_timeout] ||= 30 params[:connect_retry_interval] ||= 0.1 params[:connect_retry_count] ||= 5 # Disable send buffering since it is a RPC call params[:buffered] = false # For each new connection perform the Skynet handshake params[:on_connect] = Proc.new do |socket| # Reset user_data on each connection socket.user_data = { :seq => 0, :logger => logger } # Receive Service Handshake # Registered bool # Registered indicates the state of this service. If it is false, the connection will # close immediately and the client should look elsewhere for this service. # # ClientID string # ClientID is a UUID that is used by the client to identify itself in RPC requests. logger.debug "Waiting for Service Handshake" service_handshake = Common.read_bson_document(socket) logger.trace 'Service Handshake', service_handshake # #TODO When a reconnect returns registered == false need to throw an exception # so that this host connection is not used registered = service_handshake['registered'] client_id = service_handshake['clientid'] socket.user_data[:client_id] = client_id # Send blank ClientHandshake client_handshake = { 'clientid' => client_id } logger.debug "Sending Client Handshake" logger.trace 'Client Handshake', client_handshake socket.write(client_handshake.to_bson) end # To prevent strange issues if user incorrectly supplies server names params.delete(:server) params[:servers] = servers @socket = ResilientSocket::TCPClient.new(params) end |
Instance Attribute Details
#socket ⇒ Object (readonly)
Returns the underlying socket being used by a Connection instance
17 18 19 |
# File 'lib/ruby_skynet/connection.rb', line 17 def socket @socket end |
Class Method Details
.with_connection(server, params = {}, &block) ⇒ Object
Execute the supplied block with a connection from the pool
196 197 198 199 200 201 202 203 204 |
# File 'lib/ruby_skynet/connection.rb', line 196 def self.with_connection(server, params={}, &block) conn = nil begin conn = new(server, params) block.call(conn) ensure conn.close if conn end end |
Instance Method Details
#close ⇒ Object
206 207 208 |
# File 'lib/ruby_skynet/connection.rb', line 206 def close @socket.close if @socket end |
#rpc_call(request_id, skynet_name, method_name, parameters, idempotent = false) ⇒ Object
Performs a synchronous call to a Skynet server
Parameters:
skynet_name [String|Symbol]:
Name of the method to pass in the request
method_name [String|Symbol]:
Name of the method to pass in the request
parameters [Hash]:
Parameters to pass in the request
idempotent [True|False]:
If the request can be applied again to the server without changing its state
Set to true to retry the entire request after the send is successful
Returns the Hash result returned from the Skynet Service
Raises RubySkynet::ProtocolError Raises RubySkynet::SkynetException
106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 |
# File 'lib/ruby_skynet/connection.rb', line 106 def rpc_call(request_id, skynet_name, method_name, parameters, idempotent=false) logger.benchmark_info "Called #{skynet_name}.#{method_name}" do retry_count = 0 header = nil response = nil socket.retry_on_connection_failure do |socket| header = { 'servicemethod' => "#{skynet_name}.Forward", 'seq' => socket.user_data[:seq] } logger.debug "Sending Header" logger.trace 'Header', header socket.write(header.to_bson) # The parameters are placed in the request object in BSON serialized form request = { 'clientid' => socket.user_data[:client_id], 'in' => BSON::Binary.new(parameters.to_bson), 'method' => method_name.to_s, 'requestinfo' => { 'requestid' => request_id, # Increment retry count to indicate that the request may have been tried previously 'retrycount' => retry_count, # TODO: this should be forwarded along in case of services also # being a client and calling additional services. If empty it will # be stuffed with connecting address 'originaddress' => '' } } logger.debug "Sending Request" logger.trace 'Request', request logger.trace 'Parameters:', parameters socket.write(request.to_bson) # Since Send does not affect state on the server we can also retry reads if idempotent logger.debug "Reading header from server" header = Common.read_bson_document(socket) logger.debug 'Response Header', header # Read the BSON response document logger.debug "Reading response from server" response = Common.read_bson_document(socket) logger.trace 'Response', response end end # Perform the read outside the retry block since a successful write # means that the servers state may have been changed unless idempotent # Read header first as a separate BSON document logger.debug "Reading header from server" header = Common.read_bson_document(socket) logger.debug 'Response Header', header # Read the BSON response document logger.debug "Reading response from server" response = Common.read_bson_document(socket) logger.trace 'Response', response end # Ensure the sequence number in the response header matches the # sequence number sent in the request seq_no = header['seq'] if seq_no != socket.user_data[:seq] raise ProtocolError.new("Incorrect Response received, expected seq=#{socket.user_data[:seq]}, received: #{header.inspect}") end # Increment Sequence number only on successful response socket.user_data[:seq] += 1 # If an error is returned from Skynet raise a Skynet exception error = header['error'] raise SkynetException.new(error) if error.to_s.length > 0 # If an error is returned from the service raise a Service exception error = response['error'] raise ServiceException.new(error) if error.to_s.length > 0 # Return Value # The return value is inside the response object, it's a byte array of it's own and needs to be deserialized result = Hash.from_bson(StringIO.new(response['out'].data)) logger.trace 'Return Value', result result end end |