Class: RightScale::RouterClient

Inherits:
BaseRetryClient show all
Defined in:
lib/right_agent/clients/router_client.rb

Overview

HTTP interface to RightNet router

Constant Summary collapse

API_VERSION =

RightNet router API version for use in X-API-Version header

"2.0"
CONNECT_INTERVAL =

Initial interval between attempts to make a WebSocket connection

30
MAX_CONNECT_INTERVAL =

Maximum interval between attempts to make a WebSocket connection

60 * 60 * 24
RECONNECT_INTERVAL =

Initial interval between attempts to reconnect or long-poll when router is not responding

2
MAX_RECONNECT_INTERVAL =

Maximum interval between attempts to reconnect or long-poll when router is not responding

60
CHECK_INTERVAL =

Interval between checks for lost WebSocket connection

5
BACKOFF_FACTOR =

Backoff factor for connect and reconnect intervals

2
NORMAL_CLOSE =

WebSocket close status codes

1000
SHUTDOWN_CLOSE =
1001
PROTOCOL_ERROR_CLOSE =
1002
UNEXPECTED_ERROR_CLOSE =
1011
DEFAULT_LISTEN_TIMEOUT =

Default time to wait for an event or to ping WebSocket

60

Constants inherited from BaseRetryClient

BaseRetryClient::DEFAULT_OPEN_TIMEOUT, BaseRetryClient::DEFAULT_RECONNECT_INTERVAL, BaseRetryClient::DEFAULT_REQUEST_TIMEOUT, BaseRetryClient::DEFAULT_RETRY_INTERVALS, BaseRetryClient::DEFAULT_RETRY_TIMEOUT, BaseRetryClient::PERMITTED_STATE_TRANSITIONS

Instance Attribute Summary

Attributes inherited from BaseRetryClient

#state

Instance Method Summary collapse

Methods inherited from BaseRetryClient

#communicated, #init, #status

Constructor Details

#initialize(auth_client, options) ⇒ RouterClient

Create RightNet router client

Parameters:

  • auth_client (AuthClient)

    providing authorization session for HTTP requests

  • options (Hash)

    a customizable set of options

Options Hash (options):

  • :open_timeout (Numeric)

    maximum wait for connection; defaults to DEFAULT_OPEN_TIMEOUT

  • :request_timeout (Numeric)

    maximum wait for response; defaults to DEFAULT_REQUEST_TIMEOUT

  • :listen_timeout (Numeric)

    maximum wait for event; defaults to DEFAULT_POLL_TIMEOUT

  • :long_polling_only (Boolean)

    never attempt to create a WebSocket, always long-polling instead

  • :retry_timeout (Numeric)

    maximum before stop retrying; defaults to DEFAULT_RETRY_TIMEOUT

  • :retry_intervals (Array)

    between successive retries; defaults to DEFAULT_RETRY_INTERVALS

  • :retry_enabled (Boolean)

    for requests that fail to connect or that return a retry result

  • :reconnect_interval (Numeric)

    for reconnect attempts after lose connectivity

  • :exception_callback (Proc)

    for unexpected exceptions

Raises:

  • (ArgumentError)

    auth client does not support this client type



89
90
91
92
# File 'lib/right_agent/clients/router_client.rb', line 89

def initialize(auth_client, options)
  init(:router, auth_client, options.merge(:server_name => "RightNet", :api_version => API_VERSION))
  @options[:listen_timeout] ||= DEFAULT_LISTEN_TIMEOUT
end

Instance Method Details

#close(scope = :all) ⇒ TrueClass

Take any actions necessary to quiesce client interaction in preparation for agent termination but allow any active requests to complete

Parameters:

  • scope (Symbol) (defaults to: :all)

    of close action: :receive for just receive side of client, :all for both receive and send side; defaults to :all

Returns:

  • (TrueClass)

    always true



254
255
256
257
# File 'lib/right_agent/clients/router_client.rb', line 254

def close(scope = :all)
  super
  @websocket.close(SHUTDOWN_CLOSE, "Agent terminating") if @websocket
end

#listen(routing_keys) {|event| ... } ⇒ TrueClass

Receive events via an HTTP WebSocket if available, otherwise via an HTTP long-polling This is a blocking call and therefore should be used from a thread different than otherwise used with this object, e.g., EM.defer thread

Parameters:

  • routing_keys (Array, NilClass)

    for event sources of interest with nil meaning all

Yields:

  • (event)

    required block called each time event received

Yield Parameters:

  • event (Hash)

    received

Returns:

  • (TrueClass)

    always true, although only returns when closing

Raises:



219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
# File 'lib/right_agent/clients/router_client.rb', line 219

def listen(routing_keys, &handler)
  raise ArgumentError, "Block missing" unless block_given?

  @connect_interval = CONNECT_INTERVAL
  @last_connect_time = Time.now - @connect_interval
  @reconnect_interval = RECONNECT_INTERVAL

  uuids = nil
  retries = 0
  until [:closing, :closed].include?(state) do
    if @websocket
      @connect_interval = CONNECT_INTERVAL
      @reconnect_interval = RECONNECT_INTERVAL
      sleep(CHECK_INTERVAL)
      next
    elsif retry_connect?
      @last_connect_time = Time.now
      @close_code = @close_reason = nil
      @stats["reconnects"].update("websocket") if (retries += 1) > 1
      next if try_connect(routing_keys, &handler)
    end

    # Resort to long-polling if WebSocket not usable
    uuids = try_long_poll(routing_keys, uuids, &handler) if @websocket.nil?
  end
  true
end

#notify(event, routing_keys) ⇒ TrueClass

Route event Use WebSocket if possible Do not block this request even if in the process of closing since used for request responses

Parameters:

  • event (Hash)

    to send

  • routing_keys (Array, NilClass)

    as strings to assist router in delivering event to interested parties

Returns:

  • (TrueClass)

    always true

Raises:



185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
# File 'lib/right_agent/clients/router_client.rb', line 185

def notify(event, routing_keys)
  event[:uuid] ||= RightSupport::Data::UUID.generate
  event[:version] ||= AgentConfig.protocol_version
  params = {:event => event}
  params[:routing_keys] = routing_keys if routing_keys
  if @websocket
    path = event[:path] ? " #{event[:path]}" : ""
    to = routing_keys ? " to #{routing_keys.inspect}" : ""
    Log.info("Sending EVENT <#{event[:uuid]}> #{event[:type]}#{path}#{to}")
    @websocket.send(JSON.dump(params))
  else
    make_request(:post, "/notify", params, "notify", event[:uuid], :filter_params => ["event"])
  end
  true
end

#push(type, payload, target, token = nil) ⇒ NilClass

Route a request to a single target or multiple targets with no response expected Persist the request en route to reduce the chance of it being lost at the expense of some additional network overhead Enqueue the request if the target is not currently available Never automatically retry the request if there is the possibility of it being duplicated Set time-to-live to be forever

Parameters:

  • type (String)

    of request as path specifying actor and action

  • payload (Hash, NilClass)

    for request

  • target (Hash, NilClass)

    for request, which may be a specific agent (using :agent_id), potentially multiple targets (using :tags, :scope, :selector), or nil to route solely using type:

    String

    :agent_id serialized identity of specific target

    Array

    :tags that must all be associated with a target for it to be selected

    Hash

    :scope for restricting routing which may contain:

    Integer

    :account id that agents must be associated with to be included

    Integer

    :shard id that agents must be in to be included, or if value is

    Packet::GLOBAL, ones with no shard id
    
    Symbol

    :selector for picking from qualified targets: :any or :all;

    defaults to :any
    
  • token (String, NilClass) (defaults to: nil)

    uniquely identifying this request; defaults to randomly generated ID

Returns:

  • (NilClass)

    always nil since there is no expected response to the request

Raises:



125
126
127
128
129
130
131
# File 'lib/right_agent/clients/router_client.rb', line 125

def push(type, payload, target, token = nil)
  params = {
    :type => type,
    :payload => payload,
    :target => target }
  make_request(:post, "/push", params, type.split("/")[2], token)
end

#request(type, payload, target, token = nil) ⇒ Result, NilClass

Route a request to a single target with a response expected Automatically retry the request if a response is not received in a reasonable amount of time or if there is a non-delivery response indicating the target is not currently available Timeout the request if a response is not received in time, typically configured to 30 sec Because of retries there is the possibility of duplicated requests, and these are detected and discarded automatically for non-idempotent actions Allow the request to expire per the agent’s configured time-to-live, typically 1 minute

Parameters:

  • type (String)

    of request as path specifying actor and action

  • payload (Hash, NilClass)

    for request

  • target (Hash, NilClass)

    for request, which may be a specific agent (using :agent_id), one chosen randomly from potentially multiple targets (using :tags, :scope), or nil to route solely using type:

    String

    :agent_id serialized identity of specific target

    Array

    :tags that must all be associated with a target for it to be selected

    Hash

    :scope for restricting routing which may contain:

    Integer

    :account id that agents must be associated with to be included

  • token (String, NilClass) (defaults to: nil)

    uniquely identifying this request; defaults to randomly generated ID

Returns:

  • (Result, NilClass)

    response from request

Raises:



161
162
163
164
165
166
167
# File 'lib/right_agent/clients/router_client.rb', line 161

def request(type, payload, target, token = nil)
  params = {
    :type => type,
    :payload => payload,
    :target => target }
  make_request(:post, "/request", params, type.split("/")[2], token)
end

#stats(reset = false) ⇒ Hash

Current statistics for this client

Parameters:

  • reset (Boolean) (defaults to: false)

    the statistics after getting the current ones

Returns:

  • (Hash)

    current statistics

    Hash, NilClass

    “events” Activity stats or nil if none

    Hash, NilClass

    “reconnects” Activity stats or nil if none

    Hash, NilClass

    “request failures” Activity stats or nil if none

    Hash, NilClass

    “request sent” Activity stats or nil if none

    Float, NilClass

    “response time” average number of seconds to respond to a request or nil if none

    Hash, NilClass

    “exceptions” Exceptions stats or nil if none



270
271
272
273
274
275
# File 'lib/right_agent/clients/router_client.rb', line 270

def stats(reset = false)
  events = @stats["events"].all
  stats = super(reset)
  stats["events"] = events
  stats
end