Class: Sidekiq::Addons::Prioritize::Deqr

Inherits:
BasicFetch
  • Object
show all
Defined in:
lib/sidekiq/addons/prioritize/deqr.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#script_shaObject

Returns the value of attribute script_sha.



7
8
9
# File 'lib/sidekiq/addons/prioritize/deqr.rb', line 7

def script_sha
  @script_sha
end

Class Method Details

.bulk_requeue(inprogress, options) ⇒ Object



36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/sidekiq/addons/prioritize/deqr.rb', line 36

def self.bulk_requeue(inprogress, options)
  return if inprogress.empty?

  jobs_unhandled = []
  inprogress.each do |unit_work|
    unit_work = JSON.load(unit_work)
    priority = Sidekiq::Addons::Prioritize::Enqr.get_priority_from_msg(unit_work)
    if ( priority > 0 )
      Sidekiq.redis do |con|
        Sidekiq::Addons::Prioritize::Enqr.enqueue_with_priority(con, unit_work["queue"], priority, unit_work)
      end
    else
      jobs_unhandled << unit_work.to_json
    end
  end

  Sidekiq.logger.info("Pushed #{inprogress.size - jobs_unhandled.size} jobs back to priority_queue in Redis")

  super(jobs_unhandled, options)
end

Instance Method Details

#retrieve_workObject



20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# File 'lib/sidekiq/addons/prioritize/deqr.rb', line 20

def retrieve_work
  priority_job = nil
  @queues.uniq.each do |q|
    q_name = Sidekiq::Addons::Util.priority_job_queue_name(q.split("queue:").last)
    priority_job = zpop(q_name)
    break if priority_job
  end

  if priority_job.nil?
    return super
  else
    work = JSON.load(priority_job)
    return UnitOfWork.new(work["queue"], priority_job)
  end
end

#zpop(queue) ⇒ Object



9
10
11
12
13
14
15
16
17
18
# File 'lib/sidekiq/addons/prioritize/deqr.rb', line 9

def zpop(queue)
  Sidekiq.redis do |con|
    #TODO: for some reason pipeline and multi didnt work; revisit
    if ( self.script_sha.nil? ) # or !con.script(:exists, self.script_sha) ignore this check; let it crash
      self.script_sha = con.script(:load, Sidekiq::Addons::Util::ZPOP)
    end

    con.evalsha(self.script_sha, [queue])
  end
end