Class: RightScale::Sender
- Includes:
- OperationResultHelper
- Defined in:
- lib/right_agent/sender.rb
Overview
This class allows sending requests to agents via RightNet It is used by Actor.request which is used by actors that need to send requests to remote agents If requested, it will queue requests when there are is no RightNet connection
Defined Under Namespace
Classes: SendFailure, TemporarilyOffline
Constant Summary collapse
- RETRY_BACKOFF_FACTOR =
Factor used on each retry iteration to achieve exponential backoff
3
Instance Attribute Summary collapse
-
#agent ⇒ Object
readonly
(Agent) Associated agent.
-
#connectivity_checker ⇒ Object
readonly
(ConnectivityChecker) AMQP broker connection checker.
-
#identity ⇒ Object
readonly
(String) Identity of the associated agent.
-
#mode ⇒ Object
readonly
(Symbol) RightNet communication mode: :http or :amqp.
-
#offline_handler ⇒ Object
readonly
(OfflineHandler) Handler for requests when client disconnected.
-
#pending_requests ⇒ Object
readonly
(PendingRequests) Requests waiting for a response.
Class Method Summary collapse
-
.instance ⇒ Object
For direct access to current sender.
Instance Method Summary collapse
-
#build_and_send_packet(kind, type, payload, target, token, callback) ⇒ Object
Build and send packet.
-
#build_packet(kind, type, payload, target, token, callback = false) ⇒ Object
Build packet or queue it if offline.
-
#connected? ⇒ Boolean
Determine whether currently connected to RightNet via client.
-
#disable_offline_mode ⇒ Object
Switch back to sending requests after in memory queue gets flushed Idempotent.
-
#dump_requests ⇒ Object
Create displayable dump of unfinished non-push request information Truncate list if there are more than 50 requests.
-
#enable_offline_mode ⇒ Object
Switch to offline mode In this mode requests are queued in memory rather than sent Idempotent.
-
#handle_response(response) ⇒ Object
Handle response to a request.
-
#initialize(agent) ⇒ Sender
constructor
Initialize sender.
-
#initialize_offline_queue ⇒ Object
Initialize the offline queue All requests sent prior to running this initialization are queued if offline queueing is enabled and then are sent once this initialization has run All requests following this call and prior to calling start_offline_queue are prepended to the request queue.
-
#message_received(&callback) ⇒ Object
Update the time this agent last received a request or response message Also forward this message receipt notification to any callbacks that have registered.
-
#send_push(type, payload = nil, target = nil, token = nil, &callback) ⇒ Object
Send a request to a single target or multiple targets with no response expected other than routing failures Persist the request en route to reduce the chance of it being lost at the expense of some additional network overhead Enqueue the request if the target is not currently available Never automatically retry the request if there is the possibility of it being duplicated Set time-to-live to be forever.
-
#send_request(type, payload = nil, target = nil, token = nil, &callback) ⇒ Object
Send a request to a single target with a response expected Automatically retry the request if a response is not received in a reasonable amount of time or if there is a non-delivery response indicating the target is not currently available Timeout the request if a response is not received in time, typically configured to 2 minutes Because of retries there is the possibility of duplicated requests, and these are detected and discarded automatically for non-idempotent actions Allow the request to expire per the agent’s configured time-to-live, typically 1 minute Note that receiving a response does not guarantee that the request activity has actually completed since the request processing may involve other asynchronous requests.
-
#start_offline_queue ⇒ Object
Switch offline queueing to online mode and flush all buffered messages.
-
#stats(reset = false) ⇒ Object
Get sender statistics.
-
#terminate ⇒ Object
Take any actions necessary to quiesce client interaction in preparation for agent termination but allow message receipt to continue.
Methods included from OperationResultHelper
#cancel_result, #continue_result, #error_result, #non_delivery_result, #result_from, #retry_result, #success_result
Constructor Details
#initialize(agent) ⇒ Sender
Initialize sender
Parameters
- agent(Agent)
-
Agent using this sender; uses its identity, client, and following options:
- :exception_callback(Proc)
-
Callback with following parameters that is activated on exception events:
- exception(Exception)
-
Exception
- message(Packet)
-
Message being processed
- agent(Agent)
-
Reference to agent
- :offline_queueing(Boolean)
-
Whether to queue request if client currently disconnected,
also requires agent invocation of initialize_offline_queue and start_offline_queue methods below,
as well as enable_offline_mode and disable_offline_mode as client connection status changes
:ping_interval(Integer):: Minimum number of seconds since last receipt to ping RightNet
to check connectivity, defaults to 0 meaning do not ping
:restart_callback(Proc):: Callback that is activated on each restart vote with votes being initiated
by offline queue exceeding MAX_QUEUED_REQUESTS or by repeated failures to access RightNet when online
:retry_timeout(Numeric):: Maximum number of seconds to retry request before give up
:retry_interval(Numeric):: Number of seconds before initial request retry, increases exponentially
:time_to_live(Integer):: Number of seconds before a request expires and is to be ignored
by the receiver, 0 means never expire
:async_response(Boolean):: Whether to handle responses asynchronously or to handle them immediately
upon arrival (for use by applications that were written expecting asynchronous AMQP responses)
:secure(Boolean):: true indicates to use Security features of rabbitmq to restrict agents to themselves
89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 |
# File 'lib/right_agent/sender.rb', line 89 def initialize(agent) @agent = agent @identity = @agent.identity = @agent. || {} @mode = @agent.mode @request_queue = @agent.request_queue @secure = [:secure] @retry_timeout = RightSupport::Stats.nil_if_zero([:retry_timeout]) @retry_interval = RightSupport::Stats.nil_if_zero([:retry_interval]) @pending_requests = PendingRequests.new @terminating = nil reset_stats @offline_handler = OfflineHandler.new([:restart_callback], @offline_stats) @connectivity_checker = if @mode == :amqp # Only need connectivity checker for AMQP broker since RightHttpClient does its own checking # via periodic session renewal ConnectivityChecker.new(self, [:ping_interval] || 0, @ping_stats, @exception_stats) end @@instance = self end |
Instance Attribute Details
#agent ⇒ Object (readonly)
(Agent) Associated agent
54 55 56 |
# File 'lib/right_agent/sender.rb', line 54 def agent @agent end |
#connectivity_checker ⇒ Object (readonly)
(ConnectivityChecker) AMQP broker connection checker
48 49 50 |
# File 'lib/right_agent/sender.rb', line 48 def connectivity_checker @connectivity_checker end |
#identity ⇒ Object (readonly)
(String) Identity of the associated agent
51 52 53 |
# File 'lib/right_agent/sender.rb', line 51 def identity @identity end |
#mode ⇒ Object (readonly)
(Symbol) RightNet communication mode: :http or :amqp
57 58 59 |
# File 'lib/right_agent/sender.rb', line 57 def mode @mode end |
#offline_handler ⇒ Object (readonly)
(OfflineHandler) Handler for requests when client disconnected
45 46 47 |
# File 'lib/right_agent/sender.rb', line 45 def offline_handler @offline_handler end |
#pending_requests ⇒ Object (readonly)
(PendingRequests) Requests waiting for a response
42 43 44 |
# File 'lib/right_agent/sender.rb', line 42 def pending_requests @pending_requests end |
Class Method Details
.instance ⇒ Object
For direct access to current sender
Return
- (Sender)
-
This sender instance if defined, otherwise nil
63 64 65 |
# File 'lib/right_agent/sender.rb', line 63 def self.instance @@instance if defined?(@@instance) end |
Instance Method Details
#build_and_send_packet(kind, type, payload, target, token, callback) ⇒ Object
Build and send packet
Parameters
- kind(Symbol)
-
Kind of request: :send_push or :send_request
- type(String)
-
Dispatch route for the request; typically identifies actor and action
- payload(Object)
-
Data to be sent with marshalling en route
- target(Hash|NilClass)
-
Identity of specific target as string, or hash for selecting targets
- :agent_id(String)
-
Identity of specific target
- :tags(Array)
-
Tags that must all be associated with a target for it to be selected
- :scope(Hash)
-
Scoping to be used to restrict routing
- :account(Integer)
-
Restrict to agents with this account id
- :shard(Integer)
-
Restrict to agents with this shard id, or if value is Packet::GLOBAL,
ones with no shard id
:selector(Symbol):: Which of the matched targets to be selected: :any or :all
- token(String|NilClass)
-
Token uniquely identifying request; defaults to random generated
- callback(Proc|nil)
-
Block used to process routing response
Return
- true
-
Always return true
Raise
- ArgumentError
-
If target invalid
273 274 275 276 277 278 279 280 281 |
# File 'lib/right_agent/sender.rb', line 273 def build_and_send_packet(kind, type, payload, target, token, callback) if (packet = build_packet(kind, type, payload, target, token, callback)) action = type.split('/').last received_at = @request_stats.update(action, packet.token) @request_kind_stats.update((packet.selector == :all ? "fanout" : kind.to_s)[5..-1]) send("#{@mode}_send", kind, target, packet, received_at, callback) end true end |
#build_packet(kind, type, payload, target, token, callback = false) ⇒ Object
Build packet or queue it if offline
Parameters
- kind(Symbol)
-
Kind of request: :send_push or :send_request
- type(String)
-
Dispatch route for the request; typically identifies actor and action
- payload(Object)
-
Data to be sent with marshalling en route
- target(Hash|NilClass)
-
Identity of specific target as string, or hash for selecting targets
- :agent_id(String)
-
Identity of specific target
- :tags(Array)
-
Tags that must all be associated with a target for it to be selected
- :scope(Hash)
-
Scoping to be used to restrict routing
- :account(Integer)
-
Restrict to agents with this account id
- :shard(Integer)
-
Restrict to agents with this shard id, or if value is Packet::GLOBAL,
ones with no shard id
:selector(Symbol):: Which of the matched targets to be selected: :any or :all
- token(String|NilClass)
-
Token uniquely identifying request; defaults to random generated
- callback(Boolean)
-
Whether this request has an associated response callback
Return
- (Push|Request|NilClass)
-
Packet created, or nil if queued instead
Raise
- ArgumentError
-
If target is invalid
305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 |
# File 'lib/right_agent/sender.rb', line 305 def build_packet(kind, type, payload, target, token, callback = false) validate_target(target, kind == :send_push) if queueing? @offline_handler.queue_request(kind, type, payload, target, callback) nil else if kind == :send_push packet = Push.new(type, payload) packet.selector = target[:selector] || :any if target.is_a?(Hash) packet.persistent = true packet.confirm = true if callback else packet = Request.new(type, payload) ttl = [:time_to_live] packet.expires_at = Time.now.to_i + ttl if ttl && ttl != 0 packet.selector = :any end packet.from = @identity packet.token = token || RightSupport::Data::UUID.generate if target.is_a?(Hash) if (agent_id = target[:agent_id]) packet.target = agent_id else packet. = target[:tags] || [] packet.scope = target[:scope] end else packet.target = target end packet end end |
#connected? ⇒ Boolean
Determine whether currently connected to RightNet via client
Return
- (Boolean)
-
true if offline or if client disconnected, otherwise false
153 154 155 |
# File 'lib/right_agent/sender.rb', line 153 def connected? @mode == :http ? @agent.client.connected? : @agent.client.connected.size == 0 end |
#disable_offline_mode ⇒ Object
Switch back to sending requests after in memory queue gets flushed Idempotent
Return
- true
-
Always return true
145 146 147 |
# File 'lib/right_agent/sender.rb', line 145 def disable_offline_mode @offline_handler.disable if [:offline_queueing] end |
#dump_requests ⇒ Object
Create displayable dump of unfinished non-push request information Truncate list if there are more than 50 requests
Return
- info(Array(String))
-
Receive time and token for each request in descending time order
410 411 412 413 414 415 416 417 418 419 420 |
# File 'lib/right_agent/sender.rb', line 410 def dump_requests info = [] if @pending_requests @pending_requests.kind(:send_request).each do |token, request| info << "#{request.receive_time.localtime} <#{token}>" end info.sort!.reverse! info = info[0..49] + ["..."] if info.size > 50 end info end |
#enable_offline_mode ⇒ Object
Switch to offline mode In this mode requests are queued in memory rather than sent Idempotent
Return
- true
-
Always return true
136 137 138 |
# File 'lib/right_agent/sender.rb', line 136 def enable_offline_mode @offline_handler.enable if [:offline_queueing] end |
#handle_response(response) ⇒ Object
Handle response to a request
Parameters
- response(Result)
-
Packet received as result of request
Return
- true
-
Always return true
345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 |
# File 'lib/right_agent/sender.rb', line 345 def handle_response(response) if response.is_a?(Result) token = response.token if (result = OperationResult.from_results(response)) if result.non_delivery? @non_delivery_stats.update(result.content.nil? ? "nil" : result.content.inspect) elsif result.error? @result_error_stats.update(result.content.nil? ? "nil" : result.content.inspect) end @result_stats.update(result.status) else @result_stats.update(response.results.nil? ? "nil" : response.results) end if (pending_request = @pending_requests[token]) if result && result.non_delivery? && pending_request.kind == :send_request if result.content == OperationResult::TARGET_NOT_CONNECTED # Log and temporarily ignore so that timeout retry mechanism continues, but save reason for use below if timeout # Leave purging of associated request until final response, i.e., success response or retry timeout if (parent_token = pending_request.retry_parent_token) @pending_requests[parent_token].non_delivery = result.content else pending_request.non_delivery = result.content end Log.info("Non-delivery of <#{token}> because #{result.content}") elsif result.content == OperationResult::RETRY_TIMEOUT && pending_request.non_delivery # Request timed out but due to another non-delivery reason, so use that reason since more germane response.results = OperationResult.non_delivery(pending_request.non_delivery) deliver_response(response, pending_request) else deliver_response(response, pending_request) end else deliver_response(response, pending_request) end elsif result && result.non_delivery? Log.info("Non-delivery of <#{token}> because #{result.content}") else Log.debug("No pending request for response #{response.to_s([])}") end end true end |
#initialize_offline_queue ⇒ Object
Initialize the offline queue All requests sent prior to running this initialization are queued if offline queueing is enabled and then are sent once this initialization has run All requests following this call and prior to calling start_offline_queue are prepended to the request queue
Return
- true
-
Always return true
118 119 120 |
# File 'lib/right_agent/sender.rb', line 118 def initialize_offline_queue @offline_handler.init if [:offline_queueing] end |
#message_received(&callback) ⇒ Object
Update the time this agent last received a request or response message Also forward this message receipt notification to any callbacks that have registered
Block
Optional block without parameters that is activated when a message is received
Return
- true
-
Always return true
165 166 167 |
# File 'lib/right_agent/sender.rb', line 165 def (&callback) @connectivity_checker.(&callback) if @connectivity_checker end |
#send_push(type, payload = nil, target = nil, token = nil, &callback) ⇒ Object
Send a request to a single target or multiple targets with no response expected other than routing failures Persist the request en route to reduce the chance of it being lost at the expense of some additional network overhead Enqueue the request if the target is not currently available Never automatically retry the request if there is the possibility of it being duplicated Set time-to-live to be forever
Parameters
- type(String)
-
Dispatch route for the request; typically identifies actor and action
- payload(Object)
-
Data to be sent with marshalling en route
target(Hash|NilClass) Target for request, which may be a specific agent (using :agent_id),
potentially multiple targets (using :tags, :scope, :selector), or nil to route solely
using type:
:agent_id(String):: serialized identity of specific target
:tags(Array):: Tags that must all be associated with a target for it to be selected
:scope(Hash):: Scoping to be used to restrict routing
:account(Integer):: Restrict to agents with this account id
:shard(Integer):: Restrict to agents with this shard id, or if value is Packet::GLOBAL,
ones with no shard id
:selector(Symbol):: Which of the matched targets to be selected, either :any or :all,
defaults to :any
- token(String|NilClass)
-
Token uniquely identifying request; defaults to random generated
Block
Optional block used to process routing responses asynchronously with the following parameter:
result(Result):: Response with an OperationResult of SUCCESS, RETRY, NON_DELIVERY, or ERROR,
with an initial SUCCESS response containing the targets to which the request was sent
and any additional responses indicating any failures to actually route the request
to those targets, use RightScale::OperationResult.from_results to decode
Return
- true
-
Always return true
Raise
- ArgumentError
-
If target invalid
- SendFailure
-
If sending of request failed unexpectedly
- TemporarilyOffline
-
If cannot send request because RightNet client currently disconnected
and offline queueing is disabled
208 209 210 |
# File 'lib/right_agent/sender.rb', line 208 def send_push(type, payload = nil, target = nil, token = nil, &callback) build_and_send_packet(:send_push, type, payload, target, token, callback) end |
#send_request(type, payload = nil, target = nil, token = nil, &callback) ⇒ Object
Send a request to a single target with a response expected Automatically retry the request if a response is not received in a reasonable amount of time or if there is a non-delivery response indicating the target is not currently available Timeout the request if a response is not received in time, typically configured to 2 minutes Because of retries there is the possibility of duplicated requests, and these are detected and discarded automatically for non-idempotent actions Allow the request to expire per the agent’s configured time-to-live, typically 1 minute Note that receiving a response does not guarantee that the request activity has actually completed since the request processing may involve other asynchronous requests
Parameters
- type(String)
-
Dispatch route for the request; typically identifies actor and action
- payload(Object)
-
Data to be sent with marshalling en route
target(Hash|NilClass) Target for request, which may be a specific agent (using :agent_id),
one chosen randomly from potentially multiple targets (using :tags, :scope), or nil to
route solely using type:
:agent_id(String):: serialized identity of specific target
:tags(Array):: Tags that must all be associated with a target for it to be selected
:scope(Hash):: Scoping to be used to restrict routing
:account(Integer):: Restrict to agents with this account id
:shard(Integer):: Restrict to agents with this shard id, or if value is Packet::GLOBAL,
ones with no shard id
- token(String|NilClass)
-
Token uniquely identifying request; defaults to random generated
Block
Required block used to process response asynchronously with the following parameter:
result(Result):: Response with an OperationResult of SUCCESS, RETRY, NON_DELIVERY, or ERROR,
use RightScale::OperationResult.from_results to decode
Return
- true
-
Always return true
Raise
- ArgumentError
-
If target invalid or block missing
246 247 248 249 |
# File 'lib/right_agent/sender.rb', line 246 def send_request(type, payload = nil, target = nil, token = nil, &callback) raise ArgumentError, "Missing block for response callback" unless callback build_and_send_packet(:send_request, type, payload, target, token, callback) end |
#start_offline_queue ⇒ Object
Switch offline queueing to online mode and flush all buffered messages
Return
- true
-
Always return true
126 127 128 |
# File 'lib/right_agent/sender.rb', line 126 def start_offline_queue @offline_handler.start if [:offline_queueing] end |
#stats(reset = false) ⇒ Object
Get sender statistics
Parameters
- reset(Boolean)
-
Whether to reset the statistics after getting the current ones
Return
- stats(Hash)
-
Current statistics:
- “exceptions”(Hash|nil)
-
Exceptions raised per category, or nil if none
- “total”(Integer)
-
Total exceptions for this category
- “recent”(Array)
-
Most recent as a hash of “count”, “type”, “message”, “when”, and “where”
- “non-deliveries”(Hash|nil)
-
Non-delivery activity stats with keys “total”, “percent”, “last”,
and 'rate' with percentage breakdown per reason, or nil if none
"offlines"(Hash|nil):: Offline activity stats with keys "total", "last", and "duration",
or nil if none
"pings"(Hash|nil):: Request activity stats with keys "total", "percent", "last", and "rate"
with percentage breakdown for "success" vs. "timeout", or nil if none
"request kinds"(Hash|nil):: Request kind activity stats with keys "total", "percent", and "last"
with percentage breakdown per kind, or nil if none
"requests"(Hash|nil):: Request activity stats with keys "total", "percent", "last", and "rate"
with percentage breakdown per request type, or nil if none
"requests pending"(Hash|nil):: Number of requests waiting for response and age of oldest,
or nil if none
"response time"(Float):: Average number of seconds to respond to a request recently
"result errors"(Hash|nil):: Error result activity stats with keys "total", "percent", "last",
and 'rate' with percentage breakdown per error, or nil if none
"results"(Hash|nil):: Results activity stats with keys "total", "percent", "last", and "rate"
with percentage breakdown per operation result type, or nil if none
"retries"(Hash|nil):: Retry activity stats with keys "total", "percent", "last", and "rate"
with percentage breakdown per request type, or nil if none
"send failure"(Hash|nil):: Send failure activity stats with keys "total", "percent", "last", and "rate"
with percentage breakdown per failure type, or nil if none
453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 |
# File 'lib/right_agent/sender.rb', line 453 def stats(reset = false) stats = {} if @agent offlines = @offline_stats.all offlines.merge!("duration" => @offline_stats.avg_duration) if offlines if @pending_requests.size > 0 pending = {} pending["pushes"] = @pending_requests.kind(:send_push).size requests = @pending_requests.kind(:send_request) if (pending["requests"] = requests.size) > 0 pending["oldest age"] = requests.oldest_age end end stats = { "exceptions" => @exception_stats.stats, "non-deliveries" => @non_delivery_stats.all, "offlines" => offlines, "pings" => @ping_stats.all, "request kinds" => @request_kind_stats.all, "requests" => @request_stats.all, "requests pending" => pending, "response time" => @request_stats.avg_duration, "result errors" => @result_error_stats.all, "results" => @result_stats.all, "retries" => @retry_stats.all, "send failures" => @send_failure_stats.all } reset_stats if reset end stats end |
#terminate ⇒ Object
Take any actions necessary to quiesce client interaction in preparation for agent termination but allow message receipt to continue
Return
- (Array)
-
Number of pending non-push requests and age of youngest request
394 395 396 397 398 399 400 401 402 403 |
# File 'lib/right_agent/sender.rb', line 394 def terminate if @offline_handler @offline_handler.terminate @connectivity_checker.terminate if @connectivity_checker pending = @pending_requests.kind(:send_request) [pending.size, pending.youngest_age] else [0, nil] end end |