Class: RightScale::Sender::OfflineHandler
- Defined in:
- lib/right_agent/sender.rb
Overview
Queue for storing requests while disconnected from broker and then sending them when successfully reconnect
Constant Summary collapse
- MAX_QUEUE_FLUSH_DELAY =
Maximum seconds to wait before starting flushing offline queue when disabling offline mode
2 * 60
- MAX_QUEUED_REQUESTS =
Maximum number of offline queued requests before triggering restart vote
1000
- RESTART_VOTE_DELAY =
Number of seconds that should be spent in offline mode before triggering a restart vote
15 * 60
Instance Attribute Summary collapse
-
#mode ⇒ Object
readonly
(Symbol) Current offline handling mode with possible values: Value Description :initializing Agent still initializing :online Agent connected to broker :offline Agent disconnected from broker.
-
#queue ⇒ Object
(Array) Offline queue.
-
#state ⇒ Object
readonly
(Symbol) Current queue state with possible values: Value Description Action Next state :created Queue created init :initializing :initializing Agent still initializing start :running :running Queue has been started disable when offline :flushing :flushing Sending queued requests enable :running :terminating Agent terminating.
Instance Method Summary collapse
-
#disable ⇒ Object
Switch back to sending requests to mapper after in-memory queue gets flushed Idempotent.
-
#enable ⇒ Object
Switch to offline mode In this mode requests are queued in memory rather than sent to the mapper Idempotent.
-
#init ⇒ Object
Initialize the offline queue All requests sent prior to running this initialization are queued and then are sent once this initialization has run All requests following this call and prior to calling start are prepended to the request queue.
-
#initialize(restart_callback, offline_stats) ⇒ OfflineHandler
constructor
Create offline queue.
-
#offline? ⇒ Boolean
Is agent currently offline?.
-
#queue_request(kind, type, payload, target, callback) ⇒ Object
Queue given request in memory.
-
#queueing? ⇒ Boolean
In request queueing mode?.
-
#start ⇒ Object
Switch to online mode and send all buffered messages.
-
#terminate ⇒ Object
Prepare for agent termination.
Constructor Details
#initialize(restart_callback, offline_stats) ⇒ OfflineHandler
Create offline queue
Parameters
- restart_callback(Proc)
-
Callback that is activated on each restart vote with votes being initiated
by offline queue exceeding MAX_QUEUED_REQUESTS
- offline_stats(RightSupport::Stats::Activity)
-
Offline queue tracking statistics
174 175 176 177 178 179 180 181 182 |
# File 'lib/right_agent/sender.rb', line 174 def initialize(restart_callback, offline_stats) @restart_vote = restart_callback @restart_vote_timer = nil @restart_vote_count = 0 @offline_stats = offline_stats @state = :created @mode = :initializing @queue = [] end |
Instance Attribute Details
#mode ⇒ Object (readonly)
(Symbol) Current offline handling mode with possible values:
Value Description
:initializing Agent still initializing
:online Agent connected to broker
:offline Agent disconnected from broker
163 164 165 |
# File 'lib/right_agent/sender.rb', line 163 def mode @mode end |
#queue ⇒ Object
(Array) Offline queue
166 167 168 |
# File 'lib/right_agent/sender.rb', line 166 def queue @queue end |
#state ⇒ Object (readonly)
(Symbol) Current queue state with possible values:
Value Description Action Next state
:created Queue created init :initializing
:initializing Agent still initializing start :running
:running Queue has been started disable when offline :flushing
:flushing Sending queued requests enable :running
:terminating Agent terminating
156 157 158 |
# File 'lib/right_agent/sender.rb', line 156 def state @state end |
Instance Method Details
#disable ⇒ Object
Switch back to sending requests to mapper after in-memory queue gets flushed Idempotent
Return
- true
-
Always return true
260 261 262 263 264 265 266 267 268 269 270 |
# File 'lib/right_agent/sender.rb', line 260 def disable if offline? && @state != :created Log.info("[offline] Connection to broker re-established") @offline_stats.finish cancel_timer @state = :flushing # Wait a bit to avoid flooding the mapper EM.add_timer(rand(MAX_QUEUE_FLUSH_DELAY)) { flush } end true end |
#enable ⇒ Object
Switch to offline mode In this mode requests are queued in memory rather than sent to the mapper Idempotent
Return
- true
-
Always return true
236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 |
# File 'lib/right_agent/sender.rb', line 236 def enable if offline? if @state == :flushing # If we were in offline mode then switched back to online but are still in the # process of flushing the in-memory queue and are now switching to offline mode # again then stop the flushing @state = :running end else Log.info("[offline] Disconnect from broker detected, entering offline mode") Log.info("[offline] Messages will be queued in memory until connection to broker is re-established") @offline_stats.update @queue ||= [] # Ensure queue is valid without losing any messages when going offline @mode = :offline start_timer end true end |
#init ⇒ Object
Initialize the offline queue All requests sent prior to running this initialization are queued and then are sent once this initialization has run All requests following this call and prior to calling start are prepended to the request queue
Return
- true
-
Always return true
192 193 194 195 |
# File 'lib/right_agent/sender.rb', line 192 def init @state = :initializing if @state == :created true end |
#offline? ⇒ Boolean
Is agent currently offline?
Return
- (Boolean)
-
true if agent offline, otherwise false
218 219 220 |
# File 'lib/right_agent/sender.rb', line 218 def offline? @mode == :offline || @state == :created end |
#queue_request(kind, type, payload, target, callback) ⇒ Object
Queue given request in memory
Parameters
- request(Hash)
-
Request to be stored
Return
- true
-
Always return true
279 280 281 282 283 284 285 286 287 288 289 290 |
# File 'lib/right_agent/sender.rb', line 279 def queue_request(kind, type, payload, target, callback) request = {:kind => kind, :type => type, :payload => payload, :target => target, :callback => callback} Log.info("[offline] Queuing request: #{request.inspect}") vote_to_restart if (@restart_vote_count += 1) >= MAX_QUEUED_REQUESTS if @state == :initializing # We are in the initialization callback, requests should be put at the head of the queue @queue.unshift(request) else @queue << request end true end |
#queueing? ⇒ Boolean
In request queueing mode?
Return
- (Boolean)
-
true if should queue request, otherwise false
226 227 228 |
# File 'lib/right_agent/sender.rb', line 226 def queueing? offline? && @state != :flushing end |
#start ⇒ Object
Switch to online mode and send all buffered messages
Return
- true
-
Always return true
201 202 203 204 205 206 207 208 209 210 211 212 |
# File 'lib/right_agent/sender.rb', line 201 def start if @state == :initializing if @mode == :offline @state = :running else @state = :flushing flush end @mode = :online if @mode == :initializing end true end |
#terminate ⇒ Object
Prepare for agent termination
Return
- true
-
Always return true
296 297 298 299 300 |
# File 'lib/right_agent/sender.rb', line 296 def terminate @state = :terminating cancel_timer true end |