Class: RubySkynet::Connection

Inherits:
Object
  • Object
show all
Includes:
SemanticLogger::Loggable, SyncAttr
Defined in:
lib/ruby_skynet/connection.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(servers, params = {}) ⇒ Connection

Returns a new RubySkynet connection to the server

Parameters:

:read_timeout [Float]
  Time in seconds to timeout on read
  Can be overridden by supplying a timeout in the read call
  Default: 60

:connect_timeout [Float]
  Time in seconds to timeout when trying to connect to the server
  Default: Half of the :read_timeout ( 30 seconds )

:connect_retry_count [Fixnum]
  Number of times to retry connecting when a connection fails
  Default: 10

:connect_retry_interval [Float]
  Number of seconds between connection retry attempts after the first failed attempt
  Default: 0.5


38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/ruby_skynet/connection.rb', line 38

def initialize(servers, params = {})
  params = params.dup

  # User configurable options
  params[:read_timeout]           ||= 60
  params[:connect_timeout]        ||= 30
  params[:connect_retry_interval] ||= 0.1
  params[:connect_retry_count]    ||= 5

  # Disable send buffering since it is a RPC call
  params[:buffered] = false

  # For each new connection perform the Skynet handshake
  params[:on_connect] = Proc.new do |socket|
    # Reset user_data on each connection
    socket.user_data = {
      :seq    => 0,
      :logger => logger
    }

    # Receive Service Handshake
    # Registered bool
    #   Registered indicates the state of this service. If it is false, the connection will
    #   close immediately and the client should look elsewhere for this service.
    #
    # ClientID string
    #   ClientID is a UUID that is used by the client to identify itself in RPC requests.
    logger.debug "Waiting for Service Handshake"
    service_handshake = Common.read_bson_document(socket)
    logger.trace 'Service Handshake', service_handshake

    # #TODO When a reconnect returns registered == false need to throw an exception
    # so that this host connection is not used
    registered = service_handshake['registered']
    client_id = service_handshake['clientid']
    socket.user_data[:client_id] = client_id

    # Send blank ClientHandshake
    client_handshake = { 'clientid' => client_id }
    logger.debug "Sending Client Handshake"
    logger.trace 'Client Handshake', client_handshake
    socket.write(client_handshake.to_bson)
  end

  # To prevent strange issues if user incorrectly supplies server names
  params.delete(:server)
  params[:servers] = servers

  @socket = ResilientSocket::TCPClient.new(params)
end

Instance Attribute Details

#socketObject (readonly)

Returns the underlying socket being used by a Connection instance



17
18
19
# File 'lib/ruby_skynet/connection.rb', line 17

def socket
  @socket
end

Class Method Details

.with_connection(server, params = {}, &block) ⇒ Object

Execute the supplied block with a connection from the pool



196
197
198
199
200
201
202
203
204
# File 'lib/ruby_skynet/connection.rb', line 196

def self.with_connection(server, params={}, &block)
  conn = nil
  begin
    conn = new(server, params)
    block.call(conn)
  ensure
    conn.close if conn
  end
end

Instance Method Details

#closeObject



206
207
208
# File 'lib/ruby_skynet/connection.rb', line 206

def close
  @socket.close if @socket
end

#rpc_call(request_id, skynet_name, method_name, parameters, idempotent = false) ⇒ Object

Performs a synchronous call to a Skynet server

Parameters:

skynet_name [String|Symbol]:
  Name of the method to pass in the request
method_name [String|Symbol]:
  Name of the method to pass in the request
parameters [Hash]:
  Parameters to pass in the request
idempotent [True|False]:
  If the request can be applied again to the server without changing its state
  Set to true to retry the entire request after the send is successful

Returns the Hash result returned from the Skynet Service

Raises RubySkynet::ProtocolError Raises RubySkynet::SkynetException



106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
# File 'lib/ruby_skynet/connection.rb', line 106

def rpc_call(request_id, skynet_name, method_name, parameters, idempotent=false)
  logger.benchmark_info "Called #{skynet_name}.#{method_name}" do
    retry_count = 0
    header = nil
    response = nil
    socket.retry_on_connection_failure do |socket|
      header = {
        'servicemethod' => "#{skynet_name}.Forward",
        'seq'           => socket.user_data[:seq]
      }

      logger.debug "Sending Header"
      logger.trace 'Header', header
      socket.write(header.to_bson)

      # The parameters are placed in the request object in BSON serialized form
      request = {
        'clientid'    => socket.user_data[:client_id],
        'in'          => BSON::Binary.new(parameters.to_bson),
        'method'      => method_name.to_s,
        'requestinfo' => {
          'requestid'     => request_id,
          # Increment retry count to indicate that the request may have been tried previously
          'retrycount' => retry_count,
          # TODO: this should be forwarded along in case of services also
          # being a client and calling additional services. If empty it will
          # be stuffed with connecting address
          'originaddress' => ''
        }
      }

      logger.debug "Sending Request"
      logger.trace 'Request', request
      logger.trace 'Parameters:', parameters
      socket.write(request.to_bson)

      # Since Send does not affect state on the server we can also retry reads
      if idempotent
        logger.debug "Reading header from server"
        header = Common.read_bson_document(socket)
        logger.debug 'Response Header', header

        # Read the BSON response document
        logger.debug "Reading response from server"
        response = Common.read_bson_document(socket)
        logger.trace 'Response', response
      end
    end

    # Perform the read outside the retry block since a successful write
    # means that the servers state may have been changed
    unless idempotent
      # Read header first as a separate BSON document
      logger.debug "Reading header from server"
      header = Common.read_bson_document(socket)
      logger.debug 'Response Header', header

      # Read the BSON response document
      logger.debug "Reading response from server"
      response = Common.read_bson_document(socket)
      logger.trace 'Response', response
    end

    # Ensure the sequence number in the response header matches the
    # sequence number sent in the request
    seq_no = header['seq']
    if seq_no != socket.user_data[:seq]
      raise ProtocolError.new("Incorrect Response received, expected seq=#{socket.user_data[:seq]}, received: #{header.inspect}")
    end

    # Increment Sequence number only on successful response
    socket.user_data[:seq] += 1

    # If an error is returned from Skynet raise a Skynet exception
    error = header['error']
    raise SkynetException.new(error) if error.to_s.length > 0

    # If an error is returned from the service raise a Service exception
    error = response['error']
    raise ServiceException.new(error) if error.to_s.length > 0

    # Return Value
    # The return value is inside the response object, it's a byte array of it's own and needs to be deserialized
    result = Hash.from_bson(StringIO.new(response['out'].data))
    logger.trace 'Return Value', result
    result
  end
end