Class: Sidekiq::QueuePause::PausingFetch

Inherits:
BasicFetch
  • Object
show all
Defined in:
lib/sidekiq-queue-pause.rb

Instance Method Summary collapse

Instance Method Details

#retrieve_workObject



44
45
46
47
48
49
50
51
52
53
# File 'lib/sidekiq-queue-pause.rb', line 44

def retrieve_work
  qcmd = unpaused_queues_cmd

  if qcmd.size > 1
    retrieve_work_for_queues qcmd
  else
    sleep(Sidekiq::QueuePause.retry_after || Sidekiq::BasicFetch::TIMEOUT)
    nil
  end
end

#retrieve_work_for_queues(qcmd) ⇒ Object



55
56
57
58
# File 'lib/sidekiq-queue-pause.rb', line 55

def retrieve_work_for_queues(qcmd)
  work = Sidekiq.redis { |conn| conn.brpop(*qcmd) }
  UnitOfWork.new(*work) if work
end

#unpaused_queues_cmdArray<String>

Returns the list of unpause queue names.

Returns:

  • (Array<String>)

    The list of unpaused queue names.



63
64
65
66
67
68
69
70
71
# File 'lib/sidekiq-queue-pause.rb', line 63

def unpaused_queues_cmd
  queues = queues_cmd
  queues.reject do |q|
    next if q.is_a?(Integer)
    next if q.is_a?(Hash)

    Sidekiq::QueuePause.paused?(q.gsub("queue:", ""), Sidekiq::QueuePause.process_key)
  end
end