Class: Toiler::Actor::Fetcher
- Inherits:
-
Concurrent::Actor::RestartingContext
- Object
- Concurrent::Actor::RestartingContext
- Toiler::Actor::Fetcher
- Includes:
- Utils::ActorLogging
- Defined in:
- lib/toiler/actor/fetcher.rb
Overview
Actor pulling messages only when processors are ready, otherwise idle
Instance Attribute Summary collapse
-
#ack_deadline ⇒ Object
readonly
Returns the value of attribute ack_deadline.
-
#concurrency ⇒ Object
readonly
Returns the value of attribute concurrency.
-
#executing ⇒ Object
readonly
Returns the value of attribute executing.
-
#free_processors ⇒ Object
readonly
Returns the value of attribute free_processors.
-
#queue ⇒ Object
readonly
Returns the value of attribute queue.
-
#scheduled_task ⇒ Object
readonly
Returns the value of attribute scheduled_task.
-
#wait ⇒ Object
readonly
Returns the value of attribute wait.
-
#waiting_messages ⇒ Object
readonly
Returns the value of attribute waiting_messages.
Instance Method Summary collapse
- #default_executor ⇒ Object
- #executing? ⇒ Boolean
-
#initialize(queue_name, count, provider) ⇒ Fetcher
constructor
A new instance of Fetcher.
- #on_message(msg) ⇒ Object
Methods included from Utils::ActorLogging
#debug, #error, #fatal, #info, #warn
Constructor Details
#initialize(queue_name, count, provider) ⇒ Fetcher
Returns a new instance of Fetcher.
17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
# File 'lib/toiler/actor/fetcher.rb', line 17 def initialize(queue_name, count, provider) super() debug "Initializing Fetcher for queue #{queue_name} and provider #{provider}..." @wait = Toiler.[:wait] || 60 @free_processors = count @executing = false @waiting_messages = 0 @concurrency = count @scheduled_task = nil init_queue(queue_name, provider) debug "Finished initializing Fetcher for queue #{queue_name} and provider #{provider}..." tell :pull_messages end |
Instance Attribute Details
#ack_deadline ⇒ Object (readonly)
Returns the value of attribute ack_deadline.
13 14 15 |
# File 'lib/toiler/actor/fetcher.rb', line 13 def ack_deadline @ack_deadline end |
#concurrency ⇒ Object (readonly)
Returns the value of attribute concurrency.
13 14 15 |
# File 'lib/toiler/actor/fetcher.rb', line 13 def concurrency @concurrency end |
#executing ⇒ Object (readonly)
Returns the value of attribute executing.
13 14 15 |
# File 'lib/toiler/actor/fetcher.rb', line 13 def executing @executing end |
#free_processors ⇒ Object (readonly)
Returns the value of attribute free_processors.
13 14 15 |
# File 'lib/toiler/actor/fetcher.rb', line 13 def free_processors @free_processors end |
#queue ⇒ Object (readonly)
Returns the value of attribute queue.
13 14 15 |
# File 'lib/toiler/actor/fetcher.rb', line 13 def queue @queue end |
#scheduled_task ⇒ Object (readonly)
Returns the value of attribute scheduled_task.
13 14 15 |
# File 'lib/toiler/actor/fetcher.rb', line 13 def scheduled_task @scheduled_task end |
#wait ⇒ Object (readonly)
Returns the value of attribute wait.
13 14 15 |
# File 'lib/toiler/actor/fetcher.rb', line 13 def wait @wait end |
#waiting_messages ⇒ Object (readonly)
Returns the value of attribute waiting_messages.
13 14 15 |
# File 'lib/toiler/actor/fetcher.rb', line 13 def @waiting_messages end |
Instance Method Details
#default_executor ⇒ Object
32 33 34 |
# File 'lib/toiler/actor/fetcher.rb', line 32 def default_executor Concurrent.global_fast_executor end |
#executing? ⇒ Boolean
47 48 49 |
# File 'lib/toiler/actor/fetcher.rb', line 47 def executing? @executing end |
#on_message(msg) ⇒ Object
36 37 38 39 40 41 42 43 44 45 |
# File 'lib/toiler/actor/fetcher.rb', line 36 def (msg) @executing = true method, *args = msg send(method, *args) rescue StandardError, SystemStackError => e # if we misbehave and cause a stack level too deep exception, we should be able to recover error "Fetcher #{@queue.name} raised exception #{e.class}: #{e.}\n#{e.backtrace.join("\n")}" ensure @executing = false end |