Class: Sidekiq::BasicFetch

Inherits:
Object
  • Object
show all
Defined in:
lib/sidekiq/fetch.rb

Defined Under Namespace

Classes: UnitOfWork

Constant Summary collapse

TIMEOUT =

We want the fetch operation to timeout every few seconds so the thread can check if the process is shutting down.

2

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options) ⇒ BasicFetch

Returns a new instance of BasicFetch.



27
28
29
30
31
32
33
34
# File 'lib/sidekiq/fetch.rb', line 27

def initialize(options)
  @strictly_ordered_queues = !!options[:strict]
  @queues = options[:queues].map { |q| "queue:#{q}" }
  if @strictly_ordered_queues
    @queues.uniq!
    @queues << TIMEOUT
  end
end

Class Method Details

.bulk_requeue(inprogress, options) ⇒ Object

By leaving this as a class method, it can be pluggable and used by the Manager actor. Making it an instance method will make it async to the Fetcher actor



58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/sidekiq/fetch.rb', line 58

def self.bulk_requeue(inprogress, options)
  return if inprogress.empty?

  Sidekiq.logger.debug { "Re-queueing terminated jobs" }
  jobs_to_requeue = {}
  inprogress.each do |unit_of_work|
    jobs_to_requeue[unit_of_work.queue] ||= []
    jobs_to_requeue[unit_of_work.queue] << unit_of_work.job
  end

  Sidekiq.redis do |conn|
    conn.pipelined do
      jobs_to_requeue.each do |queue, jobs|
        conn.rpush(queue, jobs)
      end
    end
  end
  Sidekiq.logger.info("Pushed #{inprogress.size} jobs back to Redis")
rescue => ex
  Sidekiq.logger.warn("Failed to requeue #{inprogress.size} jobs: #{ex.message}")
end

Instance Method Details

#queues_cmdObject

Creating the Redis#brpop command takes into account any configured queue weights. By default Redis#brpop returns data from the first queue that has pending elements. We recreate the queue command each time we invoke Redis#brpop to honor weights and avoid queue starvation.



46
47
48
49
50
51
52
53
54
# File 'lib/sidekiq/fetch.rb', line 46

def queues_cmd
  if @strictly_ordered_queues
    @queues
  else
    queues = @queues.shuffle!.uniq
    queues << TIMEOUT
    queues
  end
end

#retrieve_workObject



36
37
38
39
# File 'lib/sidekiq/fetch.rb', line 36

def retrieve_work
  work = Sidekiq.redis { |conn| conn.brpop(*queues_cmd) }
  UnitOfWork.new(*work) if work
end