Class: Sidekiq::Fetcher
- Inherits:
-
Object
- Object
- Sidekiq::Fetcher
- Includes:
- Celluloid, Util
- Defined in:
- lib/sidekiq/fetch.rb
Overview
The Fetcher blocks on Redis, waiting for a message to process from the queues. It gets the message and hands it to the Manager to assign to a ready Processor.
Constant Summary collapse
- TIMEOUT =
Timeout for Redis#blpop.
1
Instance Method Summary collapse
-
#fetch ⇒ Object
Fetching is straightforward: the Manager makes a fetch request for each idle processor when Sidekiq starts and then issues a new fetch request every time a Processor finishes a message.
-
#initialize(mgr, queues) ⇒ Fetcher
constructor
A new instance of Fetcher.
Methods included from Util
#constantize, logger, #logger, logger=, #process_id, #redis, #watchdog
Constructor Details
#initialize(mgr, queues) ⇒ Fetcher
Returns a new instance of Fetcher.
16 17 18 19 20 |
# File 'lib/sidekiq/fetch.rb', line 16 def initialize(mgr, queues) @mgr = mgr @queues = queues.map { |q| "queue:#{q}" } @unique_queues = @queues.uniq end |
Instance Method Details
#fetch ⇒ Object
Fetching is straightforward: the Manager makes a fetch request for each idle processor when Sidekiq starts and then issues a new fetch request every time a Processor finishes a message.
Because we have to shut down cleanly, we can’t block forever and we can’t loop forever. Instead we reschedule a new fetch if the current fetch turned up nothing.
30 31 32 33 34 35 36 37 38 39 40 41 42 |
# File 'lib/sidekiq/fetch.rb', line 30 def fetch watchdog('Fetcher#fetch died') do queue = nil msg = nil Sidekiq.redis { |conn| queue, msg = conn.blpop(*queues_cmd) } if msg @mgr.assign!(msg, queue.gsub(/.*queue:/, '')) else after(0) { fetch } end end end |