Class: Sidekiq::PriorityQueue::ReliableFetch
- Inherits:
-
Object
- Object
- Sidekiq::PriorityQueue::ReliableFetch
- Defined in:
- lib/sidekiq/priority_queue/reliable_fetch.rb
Defined Under Namespace
Classes: UnitOfWork
Class Method Summary collapse
Instance Method Summary collapse
- #bulk_requeue(_inprogress, options) ⇒ Object
-
#initialize(options) ⇒ ReliableFetch
constructor
A new instance of ReliableFetch.
- #queues_cmd ⇒ Object
- #retrieve_work ⇒ Object
- #spop(wip_queue) ⇒ Object
- #wip_queue(q) ⇒ Object
- #zpopmin_sadd(queue, wip_queue) ⇒ Object
Constructor Details
#initialize(options) ⇒ ReliableFetch
Returns a new instance of ReliableFetch.
37 38 39 40 41 42 |
# File 'lib/sidekiq/priority_queue/reliable_fetch.rb', line 37 def initialize() @strictly_ordered_queues = !![:strict] @queues = [:queues].map { |q| "priority-queue:#{q}" } @queues = @queues.uniq if @strictly_ordered_queues @process_index = [:index] || ENV['PROCESS_INDEX'] end |
Class Method Details
.resume_wip_jobs(queues, process_index) ⇒ Object
81 82 83 84 85 |
# File 'lib/sidekiq/priority_queue/reliable_fetch.rb', line 81 def self.resume_wip_jobs(queues, process_index) Sidekiq.logger.debug { "Re-queueing WIP jobs" } process_index ||= ENV['PROCESS_INDEX'] requeue_wip_jobs(queues, process_index) end |
Instance Method Details
#bulk_requeue(_inprogress, options) ⇒ Object
75 76 77 78 79 |
# File 'lib/sidekiq/priority_queue/reliable_fetch.rb', line 75 def bulk_requeue(_inprogress, ) Sidekiq.logger.debug { "Re-queueing terminated jobs" } process_index = [:index] || ENV['PROCESS_INDEX'] self.class.requeue_wip_jobs([:queues], process_index) end |
#queues_cmd ⇒ Object
67 68 69 70 71 72 73 |
# File 'lib/sidekiq/priority_queue/reliable_fetch.rb', line 67 def queues_cmd if @strictly_ordered_queues @queues else @queues.shuffle.uniq end end |
#retrieve_work ⇒ Object
44 45 46 47 48 49 50 |
# File 'lib/sidekiq/priority_queue/reliable_fetch.rb', line 44 def retrieve_work work = @queues.detect do |q| job = zpopmin_sadd(q, wip_queue(q)); break [q,job] if job end UnitOfWork.new(*work, wip_queue(work.first)) if work end |
#spop(wip_queue) ⇒ Object
63 64 65 |
# File 'lib/sidekiq/priority_queue/reliable_fetch.rb', line 63 def spop(wip_queue) Sidekiq.redis{ |con| con.spop(wip_queue) } end |
#wip_queue(q) ⇒ Object
52 53 54 |
# File 'lib/sidekiq/priority_queue/reliable_fetch.rb', line 52 def wip_queue(q) "#{q}_#{Socket.gethostname}_#{@process_index}" end |
#zpopmin_sadd(queue, wip_queue) ⇒ Object
56 57 58 59 60 61 |
# File 'lib/sidekiq/priority_queue/reliable_fetch.rb', line 56 def zpopmin_sadd(queue, wip_queue) Sidekiq.redis do |con| @script_sha ||= con.script(:load, Sidekiq::PriorityQueue::Scripts::ZPOPMIN_SADD) con.evalsha(@script_sha, [queue, wip_queue]) end end |