Class: Sidekiq::Fetcher

Inherits:
Object
  • Object
show all
Includes:
Celluloid, Util
Defined in:
lib/sidekiq/fetch.rb

Overview

The Fetcher blocks on Redis, waiting for a message to process from the queues. It gets the message and hands it to the Manager to assign to a ready Processor.

Constant Summary collapse

TIMEOUT =

Timeout for Redis#blpop.

1

Instance Method Summary collapse

Methods included from Util

#constantize, logger, #logger, logger=, #process_id, #redis, #watchdog

Constructor Details

#initialize(mgr, queues) ⇒ Fetcher

Returns a new instance of Fetcher.



16
17
18
19
20
# File 'lib/sidekiq/fetch.rb', line 16

def initialize(mgr, queues)
  @mgr = mgr
  @queues = queues.map { |q| "queue:#{q}" }
  @unique_queues = @queues.uniq
end

Instance Method Details

#fetchObject

Fetching is straightforward: the Manager makes a fetch request for each idle processor when Sidekiq starts and then issues a new fetch request every time a Processor finishes a message.

Because we have to shut down cleanly, we can’t block forever and we can’t loop forever. Instead we reschedule a new fetch if the current fetch turned up nothing.



30
31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/sidekiq/fetch.rb', line 30

def fetch
  watchdog('Fetcher#fetch died') do
    queue = nil
    msg = nil
    Sidekiq.redis { |conn| queue, msg = conn.blpop(*queues_cmd) }

    if msg
      @mgr.assign!(msg, queue.gsub(/.*queue:/, ''))
    else
      after(0) { fetch }
    end
  end
end