Class: Sidekiq::Fetcher

Inherits:
Object
  • Object
show all
Includes:
Celluloid, Util
Defined in:
lib/sidekiq/fetch.rb

Overview

The Fetcher blocks on Redis, waiting for a message to process from the queues. It gets the message and hands it to the Manager to assign to a ready Processor.

Constant Summary collapse

TIMEOUT =
1

Constants included from Util

Util::EXPIRY

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Util

#constantize, #logger, #process_id, #redis, #watchdog

Constructor Details

#initialize(mgr, queues) ⇒ Fetcher

Returns a new instance of Fetcher.



15
16
17
18
19
# File 'lib/sidekiq/fetch.rb', line 15

def initialize(mgr, queues)
  @mgr = mgr
  @queues = queues.map { |q| "queue:#{q}" }
  @unique_queues = @queues.uniq
end

Class Method Details

.done!Object

Ugh. Say hello to a bloody hack. Can’t find a clean way to get the fetcher to just stop processing its mailbox when shutdown starts.



55
56
57
# File 'lib/sidekiq/fetch.rb', line 55

def self.done!
  @done = true
end

.done?Boolean

Returns:

  • (Boolean)


59
60
61
# File 'lib/sidekiq/fetch.rb', line 59

def self.done?
  @done
end

Instance Method Details

#fetchObject

Fetching is straightforward: the Manager makes a fetch request for each idle processor when Sidekiq starts and then issues a new fetch request every time a Processor finishes a message.

Because we have to shut down cleanly, we can’t block forever and we can’t loop forever. Instead we reschedule a new fetch if the current fetch turned up nothing.



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/sidekiq/fetch.rb', line 29

def fetch
  watchdog('Fetcher#fetch died') do
    return if Sidekiq::Fetcher.done?

    begin
      queue = nil
      msg = nil
      Sidekiq.redis { |conn| queue, msg = conn.blpop(*queues_cmd) }

      if msg
        @mgr.assign!(msg, queue.gsub(/.*queue:/, ''))
      else
        after(0) { fetch }
      end
    rescue => ex
      logger.error("Error fetching message: #{ex}")
      logger.error(ex.backtrace.first)
      sleep(TIMEOUT)
      after(0) { fetch }
    end
  end
end