Class: Sidekiq::BasicFetch
- Inherits:
-
Object
- Object
- Sidekiq::BasicFetch
- Defined in:
- lib/sidekiq/fetch.rb
Defined Under Namespace
Classes: UnitOfWork
Constant Summary collapse
- TIMEOUT =
We want the fetch operation to timeout every few seconds so the thread can check if the process is shutting down.
2
Instance Method Summary collapse
-
#bulk_requeue(inprogress, options) ⇒ Object
By leaving this as a class method, it can be pluggable and used by the Manager actor.
-
#initialize(options) ⇒ BasicFetch
constructor
A new instance of BasicFetch.
-
#queues_cmd ⇒ Object
Creating the Redis#brpop command takes into account any configured queue weights.
- #retrieve_work ⇒ Object
Constructor Details
#initialize(options) ⇒ BasicFetch
Returns a new instance of BasicFetch.
27 28 29 30 31 32 33 34 35 36 |
# File 'lib/sidekiq/fetch.rb', line 27 def initialize() raise ArgumentError, "missing queue list" unless [:queues] @options = @strictly_ordered_queues = !!@options[:strict] @queues = @options[:queues].map { |q| "queue:#{q}" } if @strictly_ordered_queues @queues.uniq! @queues << TIMEOUT end end |
Instance Method Details
#bulk_requeue(inprogress, options) ⇒ Object
By leaving this as a class method, it can be pluggable and used by the Manager actor. Making it an instance method will make it async to the Fetcher actor
45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 |
# File 'lib/sidekiq/fetch.rb', line 45 def bulk_requeue(inprogress, ) return if inprogress.empty? Sidekiq.logger.debug { "Re-queueing terminated jobs" } jobs_to_requeue = {} inprogress.each do |unit_of_work| jobs_to_requeue[unit_of_work.queue] ||= [] jobs_to_requeue[unit_of_work.queue] << unit_of_work.job end Sidekiq.redis do |conn| conn.pipelined do jobs_to_requeue.each do |queue, jobs| conn.rpush(queue, jobs) end end end Sidekiq.logger.info("Pushed #{inprogress.size} jobs back to Redis") rescue => ex Sidekiq.logger.warn("Failed to requeue #{inprogress.size} jobs: #{ex.}") end |
#queues_cmd ⇒ Object
Creating the Redis#brpop command takes into account any configured queue weights. By default Redis#brpop returns data from the first queue that has pending elements. We recreate the queue command each time we invoke Redis#brpop to honor weights and avoid queue starvation.
72 73 74 75 76 77 78 79 80 |
# File 'lib/sidekiq/fetch.rb', line 72 def queues_cmd if @strictly_ordered_queues @queues else queues = @queues.shuffle!.uniq queues << TIMEOUT queues end end |
#retrieve_work ⇒ Object
38 39 40 41 |
# File 'lib/sidekiq/fetch.rb', line 38 def retrieve_work work = Sidekiq.redis { |conn| conn.brpop(*queues_cmd) } UnitOfWork.new(*work) if work end |