Class: RightScale::Sender

Inherits:
Object show all
Includes:
OperationResultHelper
Defined in:
lib/right_agent/sender.rb

Overview

This class allows sending requests to agents via RightNet It is used by Actor.request which is used by actors that need to send requests to remote agents If requested, it will queue requests when there are is no RightNet connection

Defined Under Namespace

Classes: SendFailure, TemporarilyOffline

Constant Summary collapse

RETRY_BACKOFF_FACTOR =

Factor used on each retry iteration to achieve exponential backoff

3

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from OperationResultHelper

#cancel_result, #continue_result, #error_result, #non_delivery_result, #result_from, #retry_result, #success_result

Constructor Details

#initialize(agent) ⇒ Sender

Initialize sender

Parameters

agent(Agent)

Agent using this sender; uses its identity, client, and following options:

:offline_queueing(Boolean)

Whether to queue request if client currently disconnected,

  also requires agent invocation of initialize_offline_queue and start_offline_queue methods below,
  as well as enable_offline_mode and disable_offline_mode as client connection status changes
:ping_interval(Numeric):: Minimum number of seconds since last message receipt to ping RightNet
  to check connectivity, defaults to 0 meaning do not ping
:restart_callback(Proc):: Callback that is activated on each restart vote with votes being initiated
  by offline queue exceeding MAX_QUEUED_REQUESTS or by repeated failures to access RightNet when online
:retry_timeout(Numeric):: Maximum number of seconds to retry request before give up
:retry_interval(Numeric):: Number of seconds before initial request retry, increases exponentially
:time_to_live(Numeric):: Number of seconds before a request expires and is to be ignored;
  non-positive value means never expire
:async_response(Boolean):: Whether to handle responses asynchronously or to handle them immediately
  upon arrival (for use by applications that were written expecting asynchronous AMQP responses)
:secure(Boolean):: true indicates to use Security features of rabbitmq to restrict agents to themselves


85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
# File 'lib/right_agent/sender.rb', line 85

def initialize(agent)
  @agent = agent
  @identity = @agent.identity
  @options = @agent.options || {}
  @mode = @agent.mode
  @request_queue = @agent.request_queue
  @secure = @options[:secure]
  @retry_timeout = RightSupport::Stats.nil_if_zero(@options[:retry_timeout])
  @retry_interval = RightSupport::Stats.nil_if_zero(@options[:retry_interval])
  @pending_requests = PendingRequests.new
  @terminating = nil
  reset_stats
  @offline_handler = OfflineHandler.new(@options[:restart_callback], @offline_stats)
  @connectivity_checker = if @mode == :amqp
    # Only need connectivity checker for AMQP broker since RightHttpClient does its own checking
    # via periodic session renewal
    ConnectivityChecker.new(self, @options[:ping_interval] || 0, @ping_stats)
  end
  @@instance = self
end

Instance Attribute Details

#agentObject (readonly)

(Agent) Associated agent



54
55
56
# File 'lib/right_agent/sender.rb', line 54

def agent
  @agent
end

#connectivity_checkerObject (readonly)

(ConnectivityChecker) AMQP broker connection checker



48
49
50
# File 'lib/right_agent/sender.rb', line 48

def connectivity_checker
  @connectivity_checker
end

#identityObject (readonly)

(String) Identity of the associated agent



51
52
53
# File 'lib/right_agent/sender.rb', line 51

def identity
  @identity
end

#modeObject (readonly)

(Symbol) RightNet communication mode: :http or :amqp



57
58
59
# File 'lib/right_agent/sender.rb', line 57

def mode
  @mode
end

#offline_handlerObject (readonly)

(OfflineHandler) Handler for requests when client disconnected



45
46
47
# File 'lib/right_agent/sender.rb', line 45

def offline_handler
  @offline_handler
end

#pending_requestsObject (readonly)

(PendingRequests) Requests waiting for a response



42
43
44
# File 'lib/right_agent/sender.rb', line 42

def pending_requests
  @pending_requests
end

Class Method Details

.instanceObject

For direct access to current sender

Return

(Sender)

This sender instance if defined, otherwise nil



63
64
65
# File 'lib/right_agent/sender.rb', line 63

def self.instance
  @@instance if defined?(@@instance)
end

Instance Method Details

#build_and_send_packet(kind, type, payload, target, options = {}, &callback) ⇒ Object

Build and send packet

Parameters

kind(Symbol)

Kind of request: :send_push or :send_request

type(String)

Dispatch route for the request; typically identifies actor and action

payload(Object)

Data to be sent with marshalling en route

target(Hash|NilClass)

Target for request

:agent_id(String)

Identity of specific target

:tags(Array)

Tags that must all be associated with a target for it to be selected

:scope(Hash)

Scoping to be used to restrict routing

:account(Integer)

Restrict to agents with this account id

:shard(Integer)

Restrict to agents with this shard id, or if value is Packet::GLOBAL,

    ones with no shard id
:selector(Symbol):: Which of the matched targets to be selected: :any or :all
options(Hash)

Request options

:token(String)

Universally unique ID for request; defaults to random generated

:time_to_live(Numeric)

Number of seconds before a request expires and is to be ignored;

non-positive value or nil means never expire for :send_push and means use configured
time-to-live for :send_request

Block

Optional block used to process response asynchronously with the following parameter:

result(Result):: Response with an OperationResult of SUCCESS, RETRY, NON_DELIVERY, or ERROR

Return

true

Always return true

Raise

ArgumentError

If target invalid



282
283
284
285
286
287
288
289
290
# File 'lib/right_agent/sender.rb', line 282

def build_and_send_packet(kind, type, payload, target, options = {}, &callback)
  if (packet = build_packet(kind, type, payload, target, options, &callback))
    action = type.split('/').last
    received_at = @request_stats.update(action, packet.token)
    @request_kind_stats.update((packet.selector == :all ? "fanout" : kind.to_s)[5..-1])
    send("#{@mode}_send", kind, target, packet, received_at, &callback)
  end
  true
end

#build_packet(kind, type, payload, target, options = {}, &callback) ⇒ Object

Build packet or queue it if offline

Parameters

kind(Symbol)

Kind of request: :send_push or :send_request

type(String)

Dispatch route for the request; typically identifies actor and action

payload(Object)

Data to be sent with marshalling en route

target(Hash|NilClass)

Target for request

:agent_id(String)

Identity of specific target

:tags(Array)

Tags that must all be associated with a target for it to be selected

:scope(Hash)

Scoping to be used to restrict routing

:account(Integer)

Restrict to agents with this account id

:shard(Integer)

Restrict to agents with this shard id, or if value is Packet::GLOBAL,

    ones with no shard id
:selector(Symbol):: Which of the matched targets to be selected: :any or :all
options(Hash)

Request options

:token(String)

Universally unique ID for request; defaults to random generated

:time_to_live(Numeric)

Number of seconds before a request expires and is to be ignored;

non-positive value or nil means never expire for :send_push and means use configured
time-to-live for :send_request

Block

Optional block used to process response asynchronously with the following parameter:

result(Result):: Response with an OperationResult of SUCCESS, RETRY, NON_DELIVERY, or ERROR

Return

(Push|Request|NilClass)

Packet created, or nil if queued instead

Raise

ArgumentError

If target is invalid



321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
# File 'lib/right_agent/sender.rb', line 321

def build_packet(kind, type, payload, target, options = {}, &callback)
  validate_target(target, kind == :send_push)
  if kind == :send_push
    packet = Push.new(type, payload)
    packet.selector = target[:selector] || :any if target.is_a?(Hash)
    packet.persistent = true
    packet.confirm = true if callback
    time_to_live = options[:time_to_live] || 0
  else
    packet = Request.new(type, payload)
    packet.selector = :any
    time_to_live = options[:time_to_live] || @options[:time_to_live]
  end
  packet.from = @identity
  packet.token = options[:token] || RightSupport::Data::UUID.generate
  packet.expires_at = Time.now.to_i + time_to_live if time_to_live && time_to_live > 0
  if target.is_a?(Hash)
    if (agent_id = target[:agent_id])
      packet.target = agent_id
    else
      packet.tags = target[:tags] || []
      packet.scope = target[:scope]
    end
  else
    packet.target = target
  end

  if queueing?
    @offline_handler.queue_request(kind, type, payload, target, packet.token, packet.expires_at, packet.skewed_by, &callback)
    nil
  else
    packet
  end
end

#connected?Boolean

Determine whether currently connected to RightNet via client

Return

(Boolean)

true if offline or if client disconnected, otherwise false

Returns:

  • (Boolean)


149
150
151
# File 'lib/right_agent/sender.rb', line 149

def connected?
  @mode == :http ? @agent.client.connected? : @agent.client.connected.size == 0
end

#disable_offline_modeObject

Switch back to sending requests after in memory queue gets flushed Idempotent

Return

true

Always return true



141
142
143
# File 'lib/right_agent/sender.rb', line 141

def disable_offline_mode
  @offline_handler.disable if @options[:offline_queueing]
end

#dump_requestsObject

Create displayable dump of unfinished non-push request information Truncate list if there are more than 50 requests

Return

info(Array(String))

Receive time and token for each request in descending time order



428
429
430
431
432
433
434
435
436
437
438
# File 'lib/right_agent/sender.rb', line 428

def dump_requests
  info = []
  if @pending_requests
    @pending_requests.kind(:send_request).each do |token, request|
      info << "#{request.receive_time.localtime} <#{token}>"
    end
    info.sort!.reverse!
    info = info[0..49] + ["..."] if info.size > 50
  end
  info
end

#enable_offline_modeObject

Switch to offline mode In this mode requests are queued in memory rather than sent Idempotent

Return

true

Always return true



132
133
134
# File 'lib/right_agent/sender.rb', line 132

def enable_offline_mode
  @offline_handler.enable if @options[:offline_queueing]
end

#handle_response(response) ⇒ Object

Handle response to a request

Parameters

response(Result)

Packet received as result of request

Return

true

Always return true



363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
# File 'lib/right_agent/sender.rb', line 363

def handle_response(response)
  if response.is_a?(Result)
    token = response.token
    if (result = OperationResult.from_results(response))
      if result.non_delivery?
        @non_delivery_stats.update(result.content.nil? ? "nil" : result.content)
      elsif result.error?
        @result_error_stats.update(result.content.nil? ? "nil" : result.content)
      end
      @result_stats.update(result.status)
    else
      @result_stats.update(response.results.nil? ? "nil" : response.results)
    end

    if (pending_request = @pending_requests[token])
      if result && result.non_delivery? && pending_request.kind == :send_request
        if result.content == OperationResult::TARGET_NOT_CONNECTED
          # Log and temporarily ignore so that timeout retry mechanism continues, but save reason for use below if timeout
          # Leave purging of associated request until final response, i.e., success response or retry timeout
          if (parent_token = pending_request.retry_parent_token)
            @pending_requests[parent_token].non_delivery = result.content
          else
            pending_request.non_delivery = result.content
          end
          Log.info("Non-delivery of <#{token}> because #{result.content}")
        elsif result.content == OperationResult::RETRY_TIMEOUT && pending_request.non_delivery
          # Request timed out but due to another non-delivery reason, so use that reason since more germane
          response.results = OperationResult.non_delivery(pending_request.non_delivery)
          deliver_response(response, pending_request)
        else
          deliver_response(response, pending_request)
        end
      else
        deliver_response(response, pending_request)
      end
    elsif result && result.non_delivery?
      Log.info("Non-delivery of <#{token}> because #{result.content}")
    else
      Log.debug("No pending request for response #{response.to_s([])}")
    end
  end
  true
end

#initialize_offline_queueObject

Initialize the offline queue All requests sent prior to running this initialization are queued if offline queueing is enabled and then are sent once this initialization has run All requests following this call and prior to calling start_offline_queue are prepended to the request queue

Return

true

Always return true



114
115
116
# File 'lib/right_agent/sender.rb', line 114

def initialize_offline_queue
  @offline_handler.init if @options[:offline_queueing]
end

#message_received(&callback) ⇒ Object

Update the time this agent last received a request or response message Also forward this message receipt notification to any callbacks that have registered

Block

Optional block without parameters that is activated when a message is received

Return

true

Always return true



161
162
163
# File 'lib/right_agent/sender.rb', line 161

def message_received(&callback)
  @connectivity_checker.message_received(&callback) if @connectivity_checker
end

#send_push(type, payload = nil, target = nil, options = {}, &callback) ⇒ Object

Send a request to a single target or multiple targets with no response expected other than routing failures Persist the request en route to reduce the chance of it being lost at the expense of some additional network overhead Enqueue the request if the target is not currently available Never automatically retry the request if there is the possibility of it being duplicated Set time-to-live to be forever

Parameters

type(String)

Dispatch route for the request; typically identifies actor and action

payload(Object)

Data to be sent with marshalling en route

target(Hash|NilClass) Target for request, which may be a specific agent (using :agent_id),

potentially multiple targets (using :tags, :scope, :selector), or nil to route solely
using type:
:agent_id(String):: serialized identity of specific target
:tags(Array):: Tags that must all be associated with a target for it to be selected
:scope(Hash):: Scoping to be used to restrict routing
  :account(Integer):: Restrict to agents with this account id
  :shard(Integer):: Restrict to agents with this shard id, or if value is Packet::GLOBAL,
    ones with no shard id
:selector(Symbol):: Which of the matched targets to be selected, either :any or :all,
  defaults to :any
options(Hash)

Request options

:token(String)

Universally unique ID for request; defaults to random generated

:time_to_live(Numeric)

Number of seconds before a request expires and is to be ignored;

non-positive value or nil means never expire; defaults to 0

Block

Optional block used to process routing responses asynchronously with the following parameter:

result(Result):: Response with an OperationResult of SUCCESS, RETRY, NON_DELIVERY, or ERROR,
  with an initial SUCCESS response containing the targets to which the request was sent
  and any additional responses indicating any failures to actually route the request
  to those targets, use RightScale::OperationResult.from_results to decode

Return

true

Always return true

Raise

ArgumentError

If target invalid

SendFailure

If sending of request failed unexpectedly

TemporarilyOffline

If cannot send request because RightNet client currently disconnected

and offline queueing is disabled


207
208
209
# File 'lib/right_agent/sender.rb', line 207

def send_push(type, payload = nil, target = nil, options = {}, &callback)
  build_and_send_packet(:send_push, type, payload, target, options, &callback)
end

#send_request(type, payload = nil, target = nil, options = {}, &callback) ⇒ Object

Send a request to a single target with a response expected Automatically retry the request if a response is not received in a reasonable amount of time or if there is a non-delivery response indicating the target is not currently available Timeout the request if a response is not received in time, typically configured to 2 minutes Because of retries there is the possibility of duplicated requests, and these are detected and discarded automatically for non-idempotent actions Allow the request to expire per the agent’s configured time-to-live, typically 1 minute Note that receiving a response does not guarantee that the request activity has actually completed since the request processing may involve other asynchronous requests

Parameters

type(String)

Dispatch route for the request; typically identifies actor and action

payload(Object)

Data to be sent with marshalling en route

target(Hash|NilClass) Target for request, which may be a specific agent (using :agent_id),

one chosen randomly from potentially multiple targets (using :tags, :scope), or nil to
route solely using type:
:agent_id(String):: serialized identity of specific target
:tags(Array):: Tags that must all be associated with a target for it to be selected
:scope(Hash):: Scoping to be used to restrict routing
  :account(Integer):: Restrict to agents with this account id
  :shard(Integer):: Restrict to agents with this shard id, or if value is Packet::GLOBAL,
    ones with no shard id
options(Hash)

Request options

:token(String)

Universally unique ID for request; defaults to random generated

:time_to_live(Numeric)

Number of seconds before a request expires and is to be ignored;

non-positive value or nil means never expire; defaults to configured :time_to_live

Block

Required block used to process response asynchronously with the following parameter:

result(Result):: Response with an OperationResult of SUCCESS, RETRY, NON_DELIVERY, or ERROR,
  use RightScale::OperationResult.from_results to decode

Return

true

Always return true

Raise

ArgumentError

If target invalid or block missing

Raises:

  • (ArgumentError)


248
249
250
251
# File 'lib/right_agent/sender.rb', line 248

def send_request(type, payload = nil, target = nil, options = {}, &callback)
  raise ArgumentError, "Missing block for response callback" unless callback
  build_and_send_packet(:send_request, type, payload, target, options, &callback)
end

#start_offline_queueObject

Switch offline queueing to online mode and flush all buffered messages

Return

true

Always return true



122
123
124
# File 'lib/right_agent/sender.rb', line 122

def start_offline_queue
  @offline_handler.start if @options[:offline_queueing]
end

#stats(reset = false) ⇒ Object

Get sender statistics

Parameters

reset(Boolean)

Whether to reset the statistics after getting the current ones

Return

stats(Hash)

Current statistics:

“non-deliveries”(Hash|nil)

Non-delivery activity stats with keys “total”, “percent”, “last”,

  and 'rate' with percentage breakdown per reason, or nil if none
"offlines"(Hash|nil):: Offline activity stats with keys "total", "last", and "duration",
  or nil if none
"pings"(Hash|nil):: Request activity stats with keys "total", "percent", "last", and "rate"
  with percentage breakdown for "success" vs. "timeout", or nil if none
"request kinds"(Hash|nil):: Request kind activity stats with keys "total", "percent", and "last"
  with percentage breakdown per kind, or nil if none
"requests"(Hash|nil):: Request activity stats with keys "total", "percent", "last", and "rate"
  with percentage breakdown per request type, or nil if none
"requests pending"(Hash|nil):: Number of requests waiting for response and age of oldest,
  or nil if none
"result errors"(Hash|nil):: Error result activity stats with keys "total", "percent", "last",
  and 'rate' with percentage breakdown per error, or nil if none
"results"(Hash|nil):: Results activity stats with keys "total", "percent", "last", and "rate"
  with percentage breakdown per operation result type, or nil if none
"retries"(Hash|nil):: Retry activity stats with keys "total", "percent", "last", and "rate"
  with percentage breakdown per request type, or nil if none
"send failure"(Hash|nil):: Send failure activity stats with keys "total", "percent", "last", and "rate"
  with percentage breakdown per failure type, or nil if none


467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
# File 'lib/right_agent/sender.rb', line 467

def stats(reset = false)
  stats = {}
  if @agent
    offlines = @offline_stats.all
    offlines.merge!("duration" => @offline_stats.avg_duration) if offlines
    if @pending_requests.size > 0
      pending = {}
      pending["pushes"] = @pending_requests.kind(:send_push).size
      requests = @pending_requests.kind(:send_request)
      if (pending["requests"] = requests.size) > 0
        pending["oldest age"] = PendingRequests.oldest_age(requests)
      end
    end
    stats = {
      "non-deliveries"   => @non_delivery_stats.all,
      "offlines"         => offlines,
      "pings"            => @ping_stats.all,
      "request kinds"    => @request_kind_stats.all,
      "requests"         => @request_stats.all,
      "requests pending" => pending,
      "result errors"    => @result_error_stats.all,
      "results"          => @result_stats.all,
      "retries"          => @retry_stats.all,
      "send failures"    => @send_failure_stats.all
    }
    reset_stats if reset
  end
  stats
end

#terminateObject

Take any actions necessary to quiesce client interaction in preparation for agent termination but allow message receipt to continue

Return

(Array)

Number of pending non-push requests and age of youngest request



412
413
414
415
416
417
418
419
420
421
# File 'lib/right_agent/sender.rb', line 412

def terminate
  if @offline_handler
    @offline_handler.terminate
    @connectivity_checker.terminate if @connectivity_checker
    pending = @pending_requests.kind(:send_request)
    [pending.size, PendingRequests.youngest_age(pending)]
  else
    [0, nil]
  end
end