Class: Sidekiq::PowerFetch
- Inherits:
-
Object
- Object
- Sidekiq::PowerFetch
- Defined in:
- lib/sidekiq/power_fetch.rb,
lib/sidekiq/power_fetch/lock.rb,
lib/sidekiq/power_fetch/recover.rb,
lib/sidekiq/power_fetch/heartbeat.rb,
lib/sidekiq/power_fetch/unit_of_work.rb
Defined Under Namespace
Classes: Heartbeat, Lock, Recover, UnitOfWork
Constant Summary collapse
- WORKING_QUEUE_PREFIX =
"working"
- IDLE_TIMEOUT =
We don’t use Redis’ blocking operations for fetch so we inject a regular sleep into the loop.
5
Class Method Summary collapse
Instance Method Summary collapse
-
#bulk_requeue(inprogress) ⇒ Object
Called by sidekiq on “hard shutdown”: when shutdown is reached, and there are still busy threads.
-
#initialize(capsule) ⇒ PowerFetch
constructor
A new instance of PowerFetch.
- #retrieve_work ⇒ Object
Constructor Details
#initialize(capsule) ⇒ PowerFetch
Returns a new instance of PowerFetch.
30 31 32 33 34 35 36 37 38 39 |
# File 'lib/sidekiq/power_fetch.rb', line 30 def initialize(capsule) raise ArgumentError, "missing queue list" unless capsule.queues @capsule = capsule @strictly_ordered_queues = capsule.mode == :strict @queues = @capsule.queues.map { |q| "queue:#{q}" } @queues.uniq! if @strictly_ordered_queues @recover = Recover.new(@capsule) @capsule.logger.info("[PowerFetch] Activated!") end |
Class Method Details
.identity ⇒ Object
seconds
16 17 18 19 20 21 22 23 24 |
# File 'lib/sidekiq/power_fetch.rb', line 16 def self.identity @identity ||= begin hostname = ENV["DYNO"] || Socket.gethostname pid = ::Process.pid process_nonce = SecureRandom.hex(6) "#{hostname}:#{pid}:#{process_nonce}" end end |
.working_queue_name(queue) ⇒ Object
26 27 28 |
# File 'lib/sidekiq/power_fetch.rb', line 26 def self.working_queue_name(queue) "#{WORKING_QUEUE_PREFIX}:#{queue}:#{identity}" end |
Instance Method Details
#bulk_requeue(inprogress) ⇒ Object
Called by sidekiq on “hard shutdown”: when shutdown is reached, and there are still busy threads. The threads are shutdown, but their jobs are requeued. github.com/sidekiq/sidekiq/blob/323a5cfaefdde20588f5ffdf0124691db83fd315/lib/sidekiq/manager.rb#L107
69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/sidekiq/power_fetch.rb', line 69 def bulk_requeue(inprogress) return if inprogress.empty? @capsule.redis do |conn| inprogress.each do |unit_of_work| conn.multi do |multi| msg = Sidekiq.load_json(unit_of_work.job) @recover.requeue_job(unit_of_work.queue, msg, multi) multi.lrem(self.class.working_queue_name(unit_of_work.queue), 1, unit_of_work.job) end end end rescue => e @capsule.logger.warn("[PowerFetch] Failed to requeue #{inprogress.size} jobs: #{e.}") end |
#retrieve_work ⇒ Object
41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/sidekiq/power_fetch.rb', line 41 def retrieve_work if @recover.lock @recover.call end queues_list = @strictly_ordered_queues ? @queues : @queues.shuffle queues_list.each do |queue| work = @capsule.redis do |conn| # Can't use 'blmove' here: empty blocked queue would then block # other, potentially non-empty, queues. conn.lmove(queue, self.class.working_queue_name(queue), :right, :left) end return UnitOfWork.new(queue, work) if work end # We didn't find a job in any of the configured queues. Let's sleep a bit # to avoid uselessly burning too much CPU sleep(IDLE_TIMEOUT) nil end |