Class: Sidekiq::BasicFetch

Inherits:
Object
  • Object
show all
Defined in:
lib/sidekiq/fetch.rb

Defined Under Namespace

Classes: UnitOfWork

Constant Summary collapse

TIMEOUT =

We want the fetch operation to timeout every few seconds so the thread can check if the process is shutting down.

2

Instance Method Summary collapse

Constructor Details

#initialize(options) ⇒ BasicFetch

Returns a new instance of BasicFetch.

Raises:

  • (ArgumentError)


27
28
29
30
31
32
33
34
35
36
# File 'lib/sidekiq/fetch.rb', line 27

def initialize(options)
  raise ArgumentError, "missing queue list" unless options[:queues]
  @options = options
  @strictly_ordered_queues = !!@options[:strict]
  @queues = @options[:queues].map { |q| "queue:#{q}" }
  if @strictly_ordered_queues
    @queues.uniq!
    @queues << TIMEOUT
  end
end

Instance Method Details

#bulk_requeue(inprogress, options) ⇒ Object

By leaving this as a class method, it can be pluggable and used by the Manager actor. Making it an instance method will make it async to the Fetcher actor



45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/sidekiq/fetch.rb', line 45

def bulk_requeue(inprogress, options)
  return if inprogress.empty?

  Sidekiq.logger.debug { "Re-queueing terminated jobs" }
  jobs_to_requeue = {}
  inprogress.each do |unit_of_work|
    jobs_to_requeue[unit_of_work.queue] ||= []
    jobs_to_requeue[unit_of_work.queue] << unit_of_work.job
  end

  Sidekiq.redis do |conn|
    conn.pipelined do
      jobs_to_requeue.each do |queue, jobs|
        conn.rpush(queue, jobs)
      end
    end
  end
  Sidekiq.logger.info("Pushed #{inprogress.size} jobs back to Redis")
rescue => ex
  Sidekiq.logger.warn("Failed to requeue #{inprogress.size} jobs: #{ex.message}")
end

#queues_cmdObject

Creating the Redis#brpop command takes into account any configured queue weights. By default Redis#brpop returns data from the first queue that has pending elements. We recreate the queue command each time we invoke Redis#brpop to honor weights and avoid queue starvation.



72
73
74
75
76
77
78
79
80
# File 'lib/sidekiq/fetch.rb', line 72

def queues_cmd
  if @strictly_ordered_queues
    @queues
  else
    queues = @queues.shuffle!.uniq
    queues << TIMEOUT
    queues
  end
end

#retrieve_workObject



38
39
40
41
# File 'lib/sidekiq/fetch.rb', line 38

def retrieve_work
  work = Sidekiq.redis { |conn| conn.brpop(*queues_cmd) }
  UnitOfWork.new(*work) if work
end