Class: RightScale::Sender
- Includes:
- OperationResultHelper
- Defined in:
- lib/right_agent/sender.rb
Overview
This class allows sending requests to agents via RightNet It is used by Actor.request which is used by actors that need to send requests to remote agents If requested, it will queue requests when there are is no RightNet connection
Defined Under Namespace
Classes: SendFailure, TemporarilyOffline
Constant Summary collapse
- RETRY_BACKOFF_FACTOR =
Factor used on each retry iteration to achieve exponential backoff
3
Instance Attribute Summary collapse
-
#agent ⇒ Object
readonly
(Agent) Associated agent.
-
#connectivity_checker ⇒ Object
readonly
(ConnectivityChecker) AMQP broker connection checker.
-
#identity ⇒ Object
readonly
(String) Identity of the associated agent.
-
#mode ⇒ Object
readonly
(Symbol) RightNet communication mode: :http or :amqp.
-
#offline_handler ⇒ Object
readonly
(OfflineHandler) Handler for requests when client disconnected.
-
#pending_requests ⇒ Object
readonly
(PendingRequests) Requests waiting for a response.
Class Method Summary collapse
-
.instance ⇒ Object
For direct access to current sender.
Instance Method Summary collapse
-
#build_and_send_packet(kind, type, payload, target, options = {}, &callback) ⇒ Object
Build and send packet.
-
#build_packet(kind, type, payload, target, options = {}, &callback) ⇒ Object
Build packet or queue it if offline.
-
#connected? ⇒ Boolean
Determine whether currently connected to RightNet via client.
-
#disable_offline_mode ⇒ Object
Switch back to sending requests after in memory queue gets flushed Idempotent.
-
#dump_requests ⇒ Object
Create displayable dump of unfinished non-push request information Truncate list if there are more than 50 requests.
-
#enable_offline_mode ⇒ Object
Switch to offline mode In this mode requests are queued in memory rather than sent Idempotent.
-
#handle_response(response) ⇒ Object
Handle response to a request.
-
#initialize(agent) ⇒ Sender
constructor
Initialize sender.
-
#initialize_offline_queue ⇒ Object
Initialize the offline queue All requests sent prior to running this initialization are queued if offline queueing is enabled and then are sent once this initialization has run All requests following this call and prior to calling start_offline_queue are prepended to the request queue.
-
#message_received(&callback) ⇒ Object
Update the time this agent last received a request or response message Also forward this message receipt notification to any callbacks that have registered.
-
#send_push(type, payload = nil, target = nil, options = {}, &callback) ⇒ Object
Send a request to a single target or multiple targets with no response expected other than routing failures Persist the request en route to reduce the chance of it being lost at the expense of some additional network overhead Enqueue the request if the target is not currently available Never automatically retry the request if there is the possibility of it being duplicated Set time-to-live to be forever.
-
#send_request(type, payload = nil, target = nil, options = {}, &callback) ⇒ Object
Send a request to a single target with a response expected Automatically retry the request if a response is not received in a reasonable amount of time or if there is a non-delivery response indicating the target is not currently available Timeout the request if a response is not received in time, typically configured to 2 minutes Because of retries there is the possibility of duplicated requests, and these are detected and discarded automatically for non-idempotent actions Allow the request to expire per the agent’s configured time-to-live, typically 1 minute Note that receiving a response does not guarantee that the request activity has actually completed since the request processing may involve other asynchronous requests.
-
#start_offline_queue ⇒ Object
Switch offline queueing to online mode and flush all buffered messages.
-
#stats(reset = false) ⇒ Object
Get sender statistics.
-
#terminate ⇒ Object
Take any actions necessary to quiesce client interaction in preparation for agent termination but allow message receipt to continue.
Methods included from OperationResultHelper
#cancel_result, #continue_result, #error_result, #non_delivery_result, #result_from, #retry_result, #success_result
Constructor Details
#initialize(agent) ⇒ Sender
Initialize sender
Parameters
- agent(Agent)
-
Agent using this sender; uses its identity, client, and following options:
- :offline_queueing(Boolean)
-
Whether to queue request if client currently disconnected,
also requires agent invocation of initialize_offline_queue and start_offline_queue methods below,
as well as enable_offline_mode and disable_offline_mode as client connection status changes
:ping_interval(Numeric):: Minimum number of seconds since last message receipt to ping RightNet
to check connectivity, defaults to 0 meaning do not ping
:restart_callback(Proc):: Callback that is activated on each restart vote with votes being initiated
by offline queue exceeding MAX_QUEUED_REQUESTS or by repeated failures to access RightNet when online
:retry_timeout(Numeric):: Maximum number of seconds to retry request before give up
:retry_interval(Numeric):: Number of seconds before initial request retry, increases exponentially
:time_to_live(Numeric):: Number of seconds before a request expires and is to be ignored;
non-positive value means never expire
:async_response(Boolean):: Whether to handle responses asynchronously or to handle them immediately
upon arrival (for use by applications that were written expecting asynchronous AMQP responses)
:secure(Boolean):: true indicates to use Security features of rabbitmq to restrict agents to themselves
85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 |
# File 'lib/right_agent/sender.rb', line 85 def initialize(agent) @agent = agent @identity = @agent.identity @options = @agent. || {} @mode = @agent.mode @request_queue = @agent.request_queue @secure = @options[:secure] @retry_timeout = RightSupport::Stats.nil_if_zero(@options[:retry_timeout]) @retry_interval = RightSupport::Stats.nil_if_zero(@options[:retry_interval]) @pending_requests = PendingRequests.new @terminating = nil reset_stats @offline_handler = OfflineHandler.new(@options[:restart_callback], @offline_stats) @connectivity_checker = if @mode == :amqp # Only need connectivity checker for AMQP broker since RightHttpClient does its own checking # via periodic session renewal ConnectivityChecker.new(self, @options[:ping_interval] || 0, @ping_stats) end @@instance = self end |
Instance Attribute Details
#agent ⇒ Object (readonly)
(Agent) Associated agent
54 55 56 |
# File 'lib/right_agent/sender.rb', line 54 def agent @agent end |
#connectivity_checker ⇒ Object (readonly)
(ConnectivityChecker) AMQP broker connection checker
48 49 50 |
# File 'lib/right_agent/sender.rb', line 48 def connectivity_checker @connectivity_checker end |
#identity ⇒ Object (readonly)
(String) Identity of the associated agent
51 52 53 |
# File 'lib/right_agent/sender.rb', line 51 def identity @identity end |
#mode ⇒ Object (readonly)
(Symbol) RightNet communication mode: :http or :amqp
57 58 59 |
# File 'lib/right_agent/sender.rb', line 57 def mode @mode end |
#offline_handler ⇒ Object (readonly)
(OfflineHandler) Handler for requests when client disconnected
45 46 47 |
# File 'lib/right_agent/sender.rb', line 45 def offline_handler @offline_handler end |
#pending_requests ⇒ Object (readonly)
(PendingRequests) Requests waiting for a response
42 43 44 |
# File 'lib/right_agent/sender.rb', line 42 def pending_requests @pending_requests end |
Class Method Details
.instance ⇒ Object
For direct access to current sender
Return
- (Sender)
-
This sender instance if defined, otherwise nil
63 64 65 |
# File 'lib/right_agent/sender.rb', line 63 def self.instance @@instance if defined?(@@instance) end |
Instance Method Details
#build_and_send_packet(kind, type, payload, target, options = {}, &callback) ⇒ Object
Build and send packet
Parameters
- kind(Symbol)
-
Kind of request: :send_push or :send_request
- type(String)
-
Dispatch route for the request; typically identifies actor and action
- payload(Object)
-
Data to be sent with marshalling en route
- target(Hash|NilClass)
-
Target for request
- :agent_id(String)
-
Identity of specific target
- :tags(Array)
-
Tags that must all be associated with a target for it to be selected
- :scope(Hash)
-
Scoping to be used to restrict routing
- :account(Integer)
-
Restrict to agents with this account id
- :shard(Integer)
-
Restrict to agents with this shard id, or if value is Packet::GLOBAL,
ones with no shard id
:selector(Symbol):: Which of the matched targets to be selected: :any or :all
- options(Hash)
-
Request options
- :token(String)
-
Universally unique ID for request; defaults to random generated
- :time_to_live(Numeric)
-
Number of seconds before a request expires and is to be ignored;
non-positive value or nil means never expire for :send_push and means use configured
time-to-live for :send_request
Block
Optional block used to process response asynchronously with the following parameter:
result(Result):: Response with an OperationResult of SUCCESS, RETRY, NON_DELIVERY, or ERROR
Return
- true
-
Always return true
Raise
- ArgumentError
-
If target invalid
282 283 284 285 286 287 288 289 290 |
# File 'lib/right_agent/sender.rb', line 282 def build_and_send_packet(kind, type, payload, target, = {}, &callback) if (packet = build_packet(kind, type, payload, target, , &callback)) action = type.split('/').last received_at = @request_stats.update(action, packet.token) @request_kind_stats.update((packet.selector == :all ? "fanout" : kind.to_s)[5..-1]) send("#{@mode}_send", kind, target, packet, received_at, &callback) end true end |
#build_packet(kind, type, payload, target, options = {}, &callback) ⇒ Object
Build packet or queue it if offline
Parameters
- kind(Symbol)
-
Kind of request: :send_push or :send_request
- type(String)
-
Dispatch route for the request; typically identifies actor and action
- payload(Object)
-
Data to be sent with marshalling en route
- target(Hash|NilClass)
-
Target for request
- :agent_id(String)
-
Identity of specific target
- :tags(Array)
-
Tags that must all be associated with a target for it to be selected
- :scope(Hash)
-
Scoping to be used to restrict routing
- :account(Integer)
-
Restrict to agents with this account id
- :shard(Integer)
-
Restrict to agents with this shard id, or if value is Packet::GLOBAL,
ones with no shard id
:selector(Symbol):: Which of the matched targets to be selected: :any or :all
- options(Hash)
-
Request options
- :token(String)
-
Universally unique ID for request; defaults to random generated
- :time_to_live(Numeric)
-
Number of seconds before a request expires and is to be ignored;
non-positive value or nil means never expire for :send_push and means use configured
time-to-live for :send_request
Block
Optional block used to process response asynchronously with the following parameter:
result(Result):: Response with an OperationResult of SUCCESS, RETRY, NON_DELIVERY, or ERROR
Return
- (Push|Request|NilClass)
-
Packet created, or nil if queued instead
Raise
- ArgumentError
-
If target is invalid
321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 |
# File 'lib/right_agent/sender.rb', line 321 def build_packet(kind, type, payload, target, = {}, &callback) validate_target(target, kind == :send_push) if kind == :send_push packet = Push.new(type, payload) packet.selector = target[:selector] || :any if target.is_a?(Hash) packet.persistent = true packet.confirm = true if callback time_to_live = [:time_to_live] || 0 else packet = Request.new(type, payload) packet.selector = :any time_to_live = [:time_to_live] || @options[:time_to_live] end packet.from = @identity packet.token = [:token] || RightSupport::Data::UUID.generate packet.expires_at = Time.now.to_i + time_to_live if time_to_live && time_to_live > 0 if target.is_a?(Hash) if (agent_id = target[:agent_id]) packet.target = agent_id else packet. = target[:tags] || [] packet.scope = target[:scope] end else packet.target = target end if queueing? @offline_handler.queue_request(kind, type, payload, target, packet.token, packet.expires_at, packet.skewed_by, &callback) nil else packet end end |
#connected? ⇒ Boolean
Determine whether currently connected to RightNet via client
Return
- (Boolean)
-
true if offline or if client disconnected, otherwise false
149 150 151 |
# File 'lib/right_agent/sender.rb', line 149 def connected? @mode == :http ? @agent.client.connected? : @agent.client.connected.size == 0 end |
#disable_offline_mode ⇒ Object
Switch back to sending requests after in memory queue gets flushed Idempotent
Return
- true
-
Always return true
141 142 143 |
# File 'lib/right_agent/sender.rb', line 141 def disable_offline_mode @offline_handler.disable if @options[:offline_queueing] end |
#dump_requests ⇒ Object
Create displayable dump of unfinished non-push request information Truncate list if there are more than 50 requests
Return
- info(Array(String))
-
Receive time and token for each request in descending time order
428 429 430 431 432 433 434 435 436 437 438 |
# File 'lib/right_agent/sender.rb', line 428 def dump_requests info = [] if @pending_requests @pending_requests.kind(:send_request).each do |token, request| info << "#{request.receive_time.localtime} <#{token}>" end info.sort!.reverse! info = info[0..49] + ["..."] if info.size > 50 end info end |
#enable_offline_mode ⇒ Object
Switch to offline mode In this mode requests are queued in memory rather than sent Idempotent
Return
- true
-
Always return true
132 133 134 |
# File 'lib/right_agent/sender.rb', line 132 def enable_offline_mode @offline_handler.enable if @options[:offline_queueing] end |
#handle_response(response) ⇒ Object
Handle response to a request
Parameters
- response(Result)
-
Packet received as result of request
Return
- true
-
Always return true
363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 |
# File 'lib/right_agent/sender.rb', line 363 def handle_response(response) if response.is_a?(Result) token = response.token if (result = OperationResult.from_results(response)) if result.non_delivery? @non_delivery_stats.update(result.content.nil? ? "nil" : result.content) elsif result.error? @result_error_stats.update(result.content.nil? ? "nil" : result.content) end @result_stats.update(result.status) else @result_stats.update(response.results.nil? ? "nil" : response.results) end if (pending_request = @pending_requests[token]) if result && result.non_delivery? && pending_request.kind == :send_request if result.content == OperationResult::TARGET_NOT_CONNECTED # Log and temporarily ignore so that timeout retry mechanism continues, but save reason for use below if timeout # Leave purging of associated request until final response, i.e., success response or retry timeout if (parent_token = pending_request.retry_parent_token) @pending_requests[parent_token].non_delivery = result.content else pending_request.non_delivery = result.content end Log.info("Non-delivery of <#{token}> because #{result.content}") elsif result.content == OperationResult::RETRY_TIMEOUT && pending_request.non_delivery # Request timed out but due to another non-delivery reason, so use that reason since more germane response.results = OperationResult.non_delivery(pending_request.non_delivery) deliver_response(response, pending_request) else deliver_response(response, pending_request) end else deliver_response(response, pending_request) end elsif result && result.non_delivery? Log.info("Non-delivery of <#{token}> because #{result.content}") else Log.debug("No pending request for response #{response.to_s([])}") end end true end |
#initialize_offline_queue ⇒ Object
Initialize the offline queue All requests sent prior to running this initialization are queued if offline queueing is enabled and then are sent once this initialization has run All requests following this call and prior to calling start_offline_queue are prepended to the request queue
Return
- true
-
Always return true
114 115 116 |
# File 'lib/right_agent/sender.rb', line 114 def initialize_offline_queue @offline_handler.init if @options[:offline_queueing] end |
#message_received(&callback) ⇒ Object
Update the time this agent last received a request or response message Also forward this message receipt notification to any callbacks that have registered
Block
Optional block without parameters that is activated when a message is received
Return
- true
-
Always return true
161 162 163 |
# File 'lib/right_agent/sender.rb', line 161 def (&callback) @connectivity_checker.(&callback) if @connectivity_checker end |
#send_push(type, payload = nil, target = nil, options = {}, &callback) ⇒ Object
Send a request to a single target or multiple targets with no response expected other than routing failures Persist the request en route to reduce the chance of it being lost at the expense of some additional network overhead Enqueue the request if the target is not currently available Never automatically retry the request if there is the possibility of it being duplicated Set time-to-live to be forever
Parameters
- type(String)
-
Dispatch route for the request; typically identifies actor and action
- payload(Object)
-
Data to be sent with marshalling en route
target(Hash|NilClass) Target for request, which may be a specific agent (using :agent_id),
potentially multiple targets (using :tags, :scope, :selector), or nil to route solely
using type:
:agent_id(String):: serialized identity of specific target
:tags(Array):: Tags that must all be associated with a target for it to be selected
:scope(Hash):: Scoping to be used to restrict routing
:account(Integer):: Restrict to agents with this account id
:shard(Integer):: Restrict to agents with this shard id, or if value is Packet::GLOBAL,
ones with no shard id
:selector(Symbol):: Which of the matched targets to be selected, either :any or :all,
defaults to :any
- options(Hash)
-
Request options
- :token(String)
-
Universally unique ID for request; defaults to random generated
- :time_to_live(Numeric)
-
Number of seconds before a request expires and is to be ignored;
non-positive value or nil means never expire; defaults to 0
Block
Optional block used to process routing responses asynchronously with the following parameter:
result(Result):: Response with an OperationResult of SUCCESS, RETRY, NON_DELIVERY, or ERROR,
with an initial SUCCESS response containing the targets to which the request was sent
and any additional responses indicating any failures to actually route the request
to those targets, use RightScale::OperationResult.from_results to decode
Return
- true
-
Always return true
Raise
- ArgumentError
-
If target invalid
- SendFailure
-
If sending of request failed unexpectedly
- TemporarilyOffline
-
If cannot send request because RightNet client currently disconnected
and offline queueing is disabled
207 208 209 |
# File 'lib/right_agent/sender.rb', line 207 def send_push(type, payload = nil, target = nil, = {}, &callback) build_and_send_packet(:send_push, type, payload, target, , &callback) end |
#send_request(type, payload = nil, target = nil, options = {}, &callback) ⇒ Object
Send a request to a single target with a response expected Automatically retry the request if a response is not received in a reasonable amount of time or if there is a non-delivery response indicating the target is not currently available Timeout the request if a response is not received in time, typically configured to 2 minutes Because of retries there is the possibility of duplicated requests, and these are detected and discarded automatically for non-idempotent actions Allow the request to expire per the agent’s configured time-to-live, typically 1 minute Note that receiving a response does not guarantee that the request activity has actually completed since the request processing may involve other asynchronous requests
Parameters
- type(String)
-
Dispatch route for the request; typically identifies actor and action
- payload(Object)
-
Data to be sent with marshalling en route
target(Hash|NilClass) Target for request, which may be a specific agent (using :agent_id),
one chosen randomly from potentially multiple targets (using :tags, :scope), or nil to
route solely using type:
:agent_id(String):: serialized identity of specific target
:tags(Array):: Tags that must all be associated with a target for it to be selected
:scope(Hash):: Scoping to be used to restrict routing
:account(Integer):: Restrict to agents with this account id
:shard(Integer):: Restrict to agents with this shard id, or if value is Packet::GLOBAL,
ones with no shard id
- options(Hash)
-
Request options
- :token(String)
-
Universally unique ID for request; defaults to random generated
- :time_to_live(Numeric)
-
Number of seconds before a request expires and is to be ignored;
non-positive value or nil means never expire; defaults to configured :time_to_live
Block
Required block used to process response asynchronously with the following parameter:
result(Result):: Response with an OperationResult of SUCCESS, RETRY, NON_DELIVERY, or ERROR,
use RightScale::OperationResult.from_results to decode
Return
- true
-
Always return true
Raise
- ArgumentError
-
If target invalid or block missing
248 249 250 251 |
# File 'lib/right_agent/sender.rb', line 248 def send_request(type, payload = nil, target = nil, = {}, &callback) raise ArgumentError, "Missing block for response callback" unless callback build_and_send_packet(:send_request, type, payload, target, , &callback) end |
#start_offline_queue ⇒ Object
Switch offline queueing to online mode and flush all buffered messages
Return
- true
-
Always return true
122 123 124 |
# File 'lib/right_agent/sender.rb', line 122 def start_offline_queue @offline_handler.start if @options[:offline_queueing] end |
#stats(reset = false) ⇒ Object
Get sender statistics
Parameters
- reset(Boolean)
-
Whether to reset the statistics after getting the current ones
Return
- stats(Hash)
-
Current statistics:
- “non-deliveries”(Hash|nil)
-
Non-delivery activity stats with keys “total”, “percent”, “last”,
and 'rate' with percentage breakdown per reason, or nil if none
"offlines"(Hash|nil):: Offline activity stats with keys "total", "last", and "duration",
or nil if none
"pings"(Hash|nil):: Request activity stats with keys "total", "percent", "last", and "rate"
with percentage breakdown for "success" vs. "timeout", or nil if none
"request kinds"(Hash|nil):: Request kind activity stats with keys "total", "percent", and "last"
with percentage breakdown per kind, or nil if none
"requests"(Hash|nil):: Request activity stats with keys "total", "percent", "last", and "rate"
with percentage breakdown per request type, or nil if none
"requests pending"(Hash|nil):: Number of requests waiting for response and age of oldest,
or nil if none
"result errors"(Hash|nil):: Error result activity stats with keys "total", "percent", "last",
and 'rate' with percentage breakdown per error, or nil if none
"results"(Hash|nil):: Results activity stats with keys "total", "percent", "last", and "rate"
with percentage breakdown per operation result type, or nil if none
"retries"(Hash|nil):: Retry activity stats with keys "total", "percent", "last", and "rate"
with percentage breakdown per request type, or nil if none
"send failure"(Hash|nil):: Send failure activity stats with keys "total", "percent", "last", and "rate"
with percentage breakdown per failure type, or nil if none
467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 |
# File 'lib/right_agent/sender.rb', line 467 def stats(reset = false) stats = {} if @agent offlines = @offline_stats.all offlines.merge!("duration" => @offline_stats.avg_duration) if offlines if @pending_requests.size > 0 pending = {} pending["pushes"] = @pending_requests.kind(:send_push).size requests = @pending_requests.kind(:send_request) if (pending["requests"] = requests.size) > 0 pending["oldest age"] = PendingRequests.oldest_age(requests) end end stats = { "non-deliveries" => @non_delivery_stats.all, "offlines" => offlines, "pings" => @ping_stats.all, "request kinds" => @request_kind_stats.all, "requests" => @request_stats.all, "requests pending" => pending, "result errors" => @result_error_stats.all, "results" => @result_stats.all, "retries" => @retry_stats.all, "send failures" => @send_failure_stats.all } reset_stats if reset end stats end |
#terminate ⇒ Object
Take any actions necessary to quiesce client interaction in preparation for agent termination but allow message receipt to continue
Return
- (Array)
-
Number of pending non-push requests and age of youngest request
412 413 414 415 416 417 418 419 420 421 |
# File 'lib/right_agent/sender.rb', line 412 def terminate if @offline_handler @offline_handler.terminate @connectivity_checker.terminate if @connectivity_checker pending = @pending_requests.kind(:send_request) [pending.size, PendingRequests.youngest_age(pending)] else [0, nil] end end |