Class: Sidekiq::Addons::Prioritize::Deqr
- Inherits:
-
BasicFetch
- Object
- BasicFetch
- Sidekiq::Addons::Prioritize::Deqr
- Defined in:
- lib/sidekiq/addons/prioritize/deqr.rb
Instance Attribute Summary collapse
-
#script_sha ⇒ Object
Returns the value of attribute script_sha.
Class Method Summary collapse
Instance Method Summary collapse
Instance Attribute Details
#script_sha ⇒ Object
Returns the value of attribute script_sha.
7 8 9 |
# File 'lib/sidekiq/addons/prioritize/deqr.rb', line 7 def script_sha @script_sha end |
Class Method Details
.bulk_requeue(inprogress, options) ⇒ Object
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
# File 'lib/sidekiq/addons/prioritize/deqr.rb', line 36 def self.bulk_requeue(inprogress, ) return if inprogress.empty? jobs_unhandled = [] inprogress.each do |unit_work| unit_work = JSON.load(unit_work) priority = Sidekiq::Addons::Prioritize::Enqr.get_priority_from_msg(unit_work) if ( priority > 0 ) Sidekiq.redis do |con| Sidekiq::Addons::Prioritize::Enqr.enqueue_with_priority(con, unit_work["queue"], priority, unit_work) end else jobs_unhandled << unit_work.to_json end end Sidekiq.logger.info("Pushed #{inprogress.size - jobs_unhandled.size} jobs back to priority_queue in Redis") super(jobs_unhandled, ) end |
Instance Method Details
#retrieve_work ⇒ Object
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
# File 'lib/sidekiq/addons/prioritize/deqr.rb', line 20 def retrieve_work priority_job = nil @queues.uniq.each do |q| q_name = Sidekiq::Addons::Util.priority_job_queue_name(q.split("queue:").last) priority_job = zpop(q_name) break if priority_job end if priority_job.nil? return super else work = JSON.load(priority_job) return UnitOfWork.new(work["queue"], priority_job) end end |
#zpop(queue) ⇒ Object
9 10 11 12 13 14 15 16 17 18 |
# File 'lib/sidekiq/addons/prioritize/deqr.rb', line 9 def zpop(queue) Sidekiq.redis do |con| #TODO: for some reason pipeline and multi didnt work; revisit if ( self.script_sha.nil? ) # or !con.script(:exists, self.script_sha) ignore this check; let it crash self.script_sha = con.script(:load, Sidekiq::Addons::Util::ZPOP) end con.evalsha(self.script_sha, [queue]) end end |