Class: Sidekiq::BasicFetch

Inherits:
Object
  • Object
show all
Defined in:
lib/sidekiq/fetch.rb

Defined Under Namespace

Classes: UnitOfWork

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options) ⇒ BasicFetch

Returns a new instance of BasicFetch.



66
67
68
69
70
# File 'lib/sidekiq/fetch.rb', line 66

def initialize(options)
  @strictly_ordered_queues = !!options[:strict]
  @queues = options[:queues].map { |q| "queue:#{q}" }
  @unique_queues = @queues.uniq
end

Class Method Details

.bulk_requeue(inprogress) ⇒ Object



77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# File 'lib/sidekiq/fetch.rb', line 77

def self.bulk_requeue(inprogress)
  Sidekiq.logger.debug { "Re-queueing terminated jobs" }
  jobs_to_requeue = {}
  inprogress.each do |unit_of_work|
    jobs_to_requeue[unit_of_work.queue] ||= []
    jobs_to_requeue[unit_of_work.queue] << unit_of_work.message
  end

  Sidekiq.redis do |conn|
    jobs_to_requeue.each do |queue, jobs|
      conn.rpush(queue, jobs)
    end
  end
  Sidekiq.logger.info("Pushed #{inprogress.size} messages back to Redis")
end

Instance Method Details

#queues_cmdObject

Creating the Redis#blpop command takes into account any configured queue weights. By default Redis#blpop returns data from the first queue that has pending elements. We recreate the queue command each time we invoke Redis#blpop to honor weights and avoid queue starvation.



114
115
116
117
# File 'lib/sidekiq/fetch.rb', line 114

def queues_cmd
  queues = @strictly_ordered_queues ? @unique_queues.dup : @queues.shuffle.uniq
  queues << Sidekiq::Fetcher::TIMEOUT
end

#retrieve_workObject



72
73
74
75
# File 'lib/sidekiq/fetch.rb', line 72

def retrieve_work
  work = Sidekiq.redis { |conn| conn.brpop(*queues_cmd) }
  UnitOfWork.new(*work) if work
end