Class: Sidekiq::QueuePause::PausingFetch

Inherits:
BasicFetch
  • Object
show all
Defined in:
lib/sidekiq-queue-pause.rb

Instance Method Summary collapse

Instance Method Details

#retrieve_workObject



45
46
47
48
49
50
51
52
53
54
# File 'lib/sidekiq-queue-pause.rb', line 45

def retrieve_work
  qcmd = unpaused_queues_cmd

  if qcmd.size > 1
    retrieve_work_for_queues qcmd
  else
    sleep(Sidekiq::QueuePause.retry_after || Sidekiq::Fetcher::TIMEOUT)
    nil
  end
end

#retrieve_work_for_queues(qcmd) ⇒ Object



56
57
58
59
# File 'lib/sidekiq-queue-pause.rb', line 56

def retrieve_work_for_queues(qcmd)
  work = Sidekiq.redis { |conn| conn.brpop(*qcmd) }
  UnitOfWork.new(*work) if work
end

#unpaused_queues_cmdObject



61
62
63
64
65
66
67
# File 'lib/sidekiq-queue-pause.rb', line 61

def unpaused_queues_cmd
  queues = queues_cmd
  queues.reject do |q|
    q != Sidekiq::Fetcher::TIMEOUT &&
      Sidekiq::QueuePause.paused?(q.gsub('queue:', ''), Sidekiq::QueuePause.process_key)
  end
end