Class: Sidekiq::BasicFetch

Inherits:
Object
  • Object
show all
Defined in:
lib/sidekiq/fetch.rb

Defined Under Namespace

Classes: UnitOfWork

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options) ⇒ BasicFetch

Returns a new instance of BasicFetch.



82
83
84
85
86
# File 'lib/sidekiq/fetch.rb', line 82

def initialize(options)
  @strictly_ordered_queues = !!options[:strict]
  @queues = options[:queues].map { |q| "queue:#{q}" }
  @unique_queues = @queues.uniq
end

Class Method Details

.bulk_requeue(inprogress) ⇒ Object



93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# File 'lib/sidekiq/fetch.rb', line 93

def self.bulk_requeue(inprogress)
  Sidekiq.logger.debug { "Re-queueing terminated jobs" }
  jobs_to_requeue = {}
  inprogress.each do |unit_of_work|
    jobs_to_requeue[unit_of_work.queue_name] ||= []
    jobs_to_requeue[unit_of_work.queue_name] << unit_of_work.message
  end

  Sidekiq.redis do |conn|
    jobs_to_requeue.each do |queue, jobs|
      conn.rpush("queue:#{queue}", jobs)
    end
  end
  Sidekiq.logger.info("Pushed #{inprogress.size} messages back to Redis")
rescue => ex
  Sidekiq.logger.warn("Failed to requeue #{inprogress.size} jobs: #{ex.message}")
end

Instance Method Details

#queues_cmdObject

Creating the Redis#blpop command takes into account any configured queue weights. By default Redis#blpop returns data from the first queue that has pending elements. We recreate the queue command each time we invoke Redis#blpop to honor weights and avoid queue starvation.



132
133
134
135
# File 'lib/sidekiq/fetch.rb', line 132

def queues_cmd
  queues = @strictly_ordered_queues ? @unique_queues.dup : @queues.shuffle.uniq
  queues << Sidekiq::Fetcher::TIMEOUT
end

#retrieve_workObject



88
89
90
91
# File 'lib/sidekiq/fetch.rb', line 88

def retrieve_work
  work = Sidekiq.redis { |conn| conn.brpop(*queues_cmd) }
  UnitOfWork.new(*work) if work
end