Class: Sidekiq::PriorityQueue::ReliableFetch

Inherits:
Object
  • Object
show all
Defined in:
lib/sidekiq/priority_queue/reliable_fetch.rb

Defined Under Namespace

Classes: UnitOfWork

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options) ⇒ ReliableFetch

Returns a new instance of ReliableFetch.



37
38
39
40
41
42
# File 'lib/sidekiq/priority_queue/reliable_fetch.rb', line 37

def initialize(options)
  @strictly_ordered_queues = !!options[:strict]
  @queues = options[:queues].map { |q| "priority-queue:#{q}" }
  @queues = @queues.uniq if @strictly_ordered_queues
  @process_index = options[:index] || ENV['PROCESS_INDEX']
end

Class Method Details

.resume_wip_jobs(queues, process_index) ⇒ Object



81
82
83
84
85
# File 'lib/sidekiq/priority_queue/reliable_fetch.rb', line 81

def self.resume_wip_jobs(queues, process_index)
  Sidekiq.logger.debug { "Re-queueing WIP jobs" }
  process_index ||= ENV['PROCESS_INDEX']
  requeue_wip_jobs(queues, process_index)
end

Instance Method Details

#bulk_requeue(_inprogress, options) ⇒ Object



75
76
77
78
79
# File 'lib/sidekiq/priority_queue/reliable_fetch.rb', line 75

def bulk_requeue(_inprogress, options)
  Sidekiq.logger.debug { "Re-queueing terminated jobs" }
  process_index = options[:index] || ENV['PROCESS_INDEX']
  self.class.requeue_wip_jobs(options[:queues], process_index)
end

#queues_cmdObject



67
68
69
70
71
72
73
# File 'lib/sidekiq/priority_queue/reliable_fetch.rb', line 67

def queues_cmd
  if @strictly_ordered_queues
    @queues
  else
    @queues.shuffle.uniq
  end
end

#retrieve_workObject



44
45
46
47
48
49
50
# File 'lib/sidekiq/priority_queue/reliable_fetch.rb', line 44

def retrieve_work
  work = @queues.detect do |q|
    job = zpopmin_sadd(q, wip_queue(q));
    break [q,job] if job
  end
  UnitOfWork.new(*work, wip_queue(work.first)) if work
end

#spop(wip_queue) ⇒ Object



63
64
65
# File 'lib/sidekiq/priority_queue/reliable_fetch.rb', line 63

def spop(wip_queue)
  Sidekiq.redis{ |con| con.spop(wip_queue) }
end

#wip_queue(q) ⇒ Object



52
53
54
# File 'lib/sidekiq/priority_queue/reliable_fetch.rb', line 52

def wip_queue(q)
  "#{q}_#{Socket.gethostname}_#{@process_index}"
end

#zpopmin_sadd(queue, wip_queue) ⇒ Object



56
57
58
59
60
61
# File 'lib/sidekiq/priority_queue/reliable_fetch.rb', line 56

def zpopmin_sadd(queue, wip_queue)
  Sidekiq.redis do |con|
    @script_sha ||= con.script(:load, Sidekiq::PriorityQueue::Scripts::ZPOPMIN_SADD)
    con.evalsha(@script_sha, [queue, wip_queue])
  end
end