Class: Sidekiq::PowerFetch

Inherits:
Object
  • Object
show all
Defined in:
lib/sidekiq/power_fetch.rb,
lib/sidekiq/power_fetch/lock.rb,
lib/sidekiq/power_fetch/recover.rb,
lib/sidekiq/power_fetch/heartbeat.rb,
lib/sidekiq/power_fetch/unit_of_work.rb

Defined Under Namespace

Classes: Heartbeat, Lock, Recover, UnitOfWork

Constant Summary collapse

WORKING_QUEUE_PREFIX =
"working"
IDLE_TIMEOUT =

We don’t use Redis’ blocking operations for fetch so we inject a regular sleep into the loop.

5

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(capsule) ⇒ PowerFetch

Returns a new instance of PowerFetch.

Raises:

  • (ArgumentError)


30
31
32
33
34
35
36
37
38
39
# File 'lib/sidekiq/power_fetch.rb', line 30

def initialize(capsule)
  raise ArgumentError, "missing queue list" unless capsule.queues
  @capsule = capsule
  @strictly_ordered_queues = capsule.mode == :strict
  @queues = @capsule.queues.map { |q| "queue:#{q}" }
  @queues.uniq! if @strictly_ordered_queues

  @recover = Recover.new(@capsule)
  @capsule.logger.info("[PowerFetch] Activated!")
end

Class Method Details

.identityObject

seconds



16
17
18
19
20
21
22
23
24
# File 'lib/sidekiq/power_fetch.rb', line 16

def self.identity
  @identity ||= begin
    hostname = ENV["DYNO"] || Socket.gethostname
    pid = ::Process.pid
    process_nonce = SecureRandom.hex(6)

    "#{hostname}:#{pid}:#{process_nonce}"
  end
end

.working_queue_name(queue) ⇒ Object



26
27
28
# File 'lib/sidekiq/power_fetch.rb', line 26

def self.working_queue_name(queue)
  "#{WORKING_QUEUE_PREFIX}:#{queue}:#{identity}"
end

Instance Method Details

#bulk_requeue(inprogress) ⇒ Object

Called by sidekiq on “hard shutdown”: when shutdown is reached, and there are still busy threads. The threads are shutdown, but their jobs are requeued. github.com/sidekiq/sidekiq/blob/323a5cfaefdde20588f5ffdf0124691db83fd315/lib/sidekiq/manager.rb#L107



69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/sidekiq/power_fetch.rb', line 69

def bulk_requeue(inprogress)
  return if inprogress.empty?

  @capsule.redis do |conn|
    inprogress.each do |unit_of_work|
      conn.multi do |multi|
        msg = Sidekiq.load_json(unit_of_work.job)
        @recover.requeue_job(unit_of_work.queue, msg, multi)

        multi.lrem(self.class.working_queue_name(unit_of_work.queue), 1, unit_of_work.job)
      end
    end
  end
rescue => e
  @capsule.logger.warn("[PowerFetch] Failed to requeue #{inprogress.size} jobs: #{e.message}")
end

#retrieve_workObject



41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/sidekiq/power_fetch.rb', line 41

def retrieve_work
  if @recover.lock
    @recover.call
  end

  queues_list = @strictly_ordered_queues ? @queues : @queues.shuffle

  queues_list.each do |queue|
    work = @capsule.redis do |conn|
      # Can't use 'blmove' here: empty blocked queue would then block
      # other, potentially non-empty, queues.
      conn.lmove(queue, self.class.working_queue_name(queue), :right, :left)
    end

    return UnitOfWork.new(queue, work) if work
  end

  # We didn't find a job in any of the configured queues. Let's sleep a bit
  # to avoid uselessly burning too much CPU
  sleep(IDLE_TIMEOUT)

  nil
end