Class: Sidekiq::QueuePause::PausingFetch
- Inherits:
-
BasicFetch
- Object
- BasicFetch
- Sidekiq::QueuePause::PausingFetch
- Defined in:
- lib/sidekiq-queue-pause.rb
Instance Method Summary collapse
- #retrieve_work ⇒ Object
- #retrieve_work_for_queues(qcmd) ⇒ Object
-
#unpaused_queues_cmd ⇒ Array<String>
Returns the list of unpause queue names.
Instance Method Details
#retrieve_work ⇒ Object
44 45 46 47 48 49 50 51 52 53 |
# File 'lib/sidekiq-queue-pause.rb', line 44 def retrieve_work qcmd = unpaused_queues_cmd if qcmd.size > 1 retrieve_work_for_queues qcmd else sleep(Sidekiq::QueuePause.retry_after || Sidekiq::BasicFetch::TIMEOUT) nil end end |
#retrieve_work_for_queues(qcmd) ⇒ Object
55 56 57 58 |
# File 'lib/sidekiq-queue-pause.rb', line 55 def retrieve_work_for_queues(qcmd) work = Sidekiq.redis { |conn| conn.brpop(*qcmd) } UnitOfWork.new(*work) if work end |
#unpaused_queues_cmd ⇒ Array<String>
Returns the list of unpause queue names.
63 64 65 66 67 68 69 70 71 |
# File 'lib/sidekiq-queue-pause.rb', line 63 def unpaused_queues_cmd queues = queues_cmd queues.reject do |q| next if q.is_a?(Integer) next if q.is_a?(Hash) Sidekiq::QueuePause.paused?(q.gsub("queue:", ""), Sidekiq::QueuePause.process_key) end end |