Class: Sidekiq::BasicFetch

Inherits:
Object
  • Object
show all
Defined in:
lib/sidekiq/fetch.rb

Defined Under Namespace

Classes: UnitOfWork

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options) ⇒ BasicFetch

Returns a new instance of BasicFetch.



94
95
96
97
98
# File 'lib/sidekiq/fetch.rb', line 94

def initialize(options)
  @strictly_ordered_queues = !!options[:strict]
  @queues = options[:queues].map { |q| "queue:#{q}" }
  @unique_queues = @queues.uniq
end

Class Method Details

.bulk_requeue(inprogress, options) ⇒ Object

By leaving this as a class method, it can be pluggable and used by the Manager actor. Making it an instance method will make it async to the Fetcher actor



107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
# File 'lib/sidekiq/fetch.rb', line 107

def self.bulk_requeue(inprogress, options)
  return if inprogress.empty?

  Sidekiq.logger.debug { "Re-queueing terminated jobs" }
  jobs_to_requeue = {}
  inprogress.each do |unit_of_work|
    jobs_to_requeue[unit_of_work.queue_name] ||= []
    jobs_to_requeue[unit_of_work.queue_name] << unit_of_work.message
  end

  Sidekiq.redis do |conn|
    jobs_to_requeue.each do |queue, jobs|
      conn.rpush("queue:#{queue}", jobs)
    end
  end
  Sidekiq.logger.info("Pushed #{inprogress.size} messages back to Redis")
rescue => ex
  Sidekiq.logger.warn("Failed to requeue #{inprogress.size} jobs: #{ex.message}")
end

Instance Method Details

#queues_cmdObject

Creating the Redis#blpop command takes into account any configured queue weights. By default Redis#blpop returns data from the first queue that has pending elements. We recreate the queue command each time we invoke Redis#blpop to honor weights and avoid queue starvation.



148
149
150
151
# File 'lib/sidekiq/fetch.rb', line 148

def queues_cmd
  queues = @strictly_ordered_queues ? @unique_queues.dup : @queues.shuffle.uniq
  queues << Sidekiq::Fetcher::TIMEOUT
end

#retrieve_workObject



100
101
102
103
# File 'lib/sidekiq/fetch.rb', line 100

def retrieve_work
  work = Sidekiq.redis { |conn| conn.brpop(*queues_cmd) }
  UnitOfWork.new(*work) if work
end