Class: Toiler::Actor::Fetcher
- Inherits:
-
Concurrent::Actor::RestartingContext
- Object
- Concurrent::Actor::RestartingContext
- Toiler::Actor::Fetcher
- Includes:
- Utils::ActorLogging
- Defined in:
- lib/toiler/actor/fetcher.rb
Overview
Actor polling for messages only when processors are ready, otherwise idle
Constant Summary collapse
- FETCH_LIMIT =
10
Instance Attribute Summary collapse
-
#concurrency ⇒ Object
Returns the value of attribute concurrency.
-
#executing ⇒ Object
Returns the value of attribute executing.
-
#free_processors ⇒ Object
Returns the value of attribute free_processors.
-
#queue ⇒ Object
Returns the value of attribute queue.
-
#visibility_timeout ⇒ Object
Returns the value of attribute visibility_timeout.
-
#wait ⇒ Object
Returns the value of attribute wait.
-
#waiting_messages ⇒ Object
Returns the value of attribute waiting_messages.
Instance Method Summary collapse
- #default_executor ⇒ Object
- #executing? ⇒ Boolean
-
#initialize(queue, client, count) ⇒ Fetcher
constructor
A new instance of Fetcher.
- #on_message(msg) ⇒ Object
Methods included from Utils::ActorLogging
#debug, #error, #fatal, #info, #warn
Constructor Details
#initialize(queue, client, count) ⇒ Fetcher
Returns a new instance of Fetcher.
15 16 17 18 19 20 21 22 23 24 25 26 27 |
# File 'lib/toiler/actor/fetcher.rb', line 15 def initialize(queue, client, count) debug "Initializing Fetcher for queue #{queue}..." @queue = Toiler::Aws::Queue.new queue, client @wait = Toiler.[:wait] || 60 @free_processors = count @batch = Toiler.worker_class_registry[queue].batch? @visibility_timeout = @queue.visibility_timeout @executing = false = 0 @concurrency = count debug "Finished initializing Fetcher for queue #{queue}" tell :poll_messages end |
Instance Attribute Details
#concurrency ⇒ Object
Returns the value of attribute concurrency.
12 13 14 |
# File 'lib/toiler/actor/fetcher.rb', line 12 def concurrency @concurrency end |
#executing ⇒ Object
Returns the value of attribute executing.
12 13 14 |
# File 'lib/toiler/actor/fetcher.rb', line 12 def executing @executing end |
#free_processors ⇒ Object
Returns the value of attribute free_processors.
12 13 14 |
# File 'lib/toiler/actor/fetcher.rb', line 12 def free_processors @free_processors end |
#queue ⇒ Object
Returns the value of attribute queue.
12 13 14 |
# File 'lib/toiler/actor/fetcher.rb', line 12 def queue @queue end |
#visibility_timeout ⇒ Object
Returns the value of attribute visibility_timeout.
12 13 14 |
# File 'lib/toiler/actor/fetcher.rb', line 12 def visibility_timeout @visibility_timeout end |
#wait ⇒ Object
Returns the value of attribute wait.
12 13 14 |
# File 'lib/toiler/actor/fetcher.rb', line 12 def wait @wait end |
#waiting_messages ⇒ Object
Returns the value of attribute waiting_messages.
12 13 14 |
# File 'lib/toiler/actor/fetcher.rb', line 12 def end |
Instance Method Details
#default_executor ⇒ Object
29 30 31 |
# File 'lib/toiler/actor/fetcher.rb', line 29 def default_executor Concurrent.global_fast_executor end |
#executing? ⇒ Boolean
44 45 46 |
# File 'lib/toiler/actor/fetcher.rb', line 44 def executing? @executing end |
#on_message(msg) ⇒ Object
33 34 35 36 37 38 39 40 41 42 |
# File 'lib/toiler/actor/fetcher.rb', line 33 def (msg) @executing = true method, *args = msg send(method, *args) rescue StandardError, SystemStackError => e # rescue SystemStackError, if we misbehave and cause a stack level too deep exception, we should be able to recover error "Fetcher #{queue.name} raised exception #{e.class}: #{e.message}\n#{e.backtrace.join("\n")}" ensure @executing = false end |