Class: Sidekiq::PowerFetch::Recover

Inherits:
Object
  • Object
show all
Defined in:
lib/sidekiq/power_fetch/recover.rb

Constant Summary collapse

SCAN_COUNT =

Defines the COUNT parameter that will be passed to Redis SCAN command

1000
RECOVERIES =

How much time a job can be interrupted

3
WORKING_QUEUE_REGEX =

Regexes for matching working queue keys

/\A#{WORKING_QUEUE_PREFIX}:(queue:.*):([^:]*:[0-9]*:[0-9a-f]*)\z/

Instance Method Summary collapse

Constructor Details

#initialize(capsule) ⇒ Recover

Returns a new instance of Recover.



17
18
19
20
21
# File 'lib/sidekiq/power_fetch/recover.rb', line 17

def initialize(capsule)
  @capsule = capsule
  @lock = Lock.new(@capsule)
  @recoveries = @capsule.config[:power_fetch_recoveries] || RECOVERIES
end

Instance Method Details

#callObject

Detect “old” jobs and requeue them because the worker they were assigned to probably failed miserably.



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/sidekiq/power_fetch/recover.rb', line 29

def call
  @capsule.logger.info("[PowerFetch] Recovering working queues")

  @capsule.redis do |conn|
    conn.scan(
      match: "#{WORKING_QUEUE_PREFIX}:queue:*",
      count: SCAN_COUNT
    ) do |key|
      # Identity format is "{hostname}:{pid}:{randomhex}
      # Queue names may also have colons (namespaced).
      # Expressing this in a single regex is unreadable
      original_queue, identity = key.scan(WORKING_QUEUE_REGEX).flatten

      next if original_queue.nil? || identity.nil?

      if worker_dead?(identity, conn)
        recover_working_queue!(original_queue, key)
      end
    end
  end
end

#lockObject



23
24
25
# File 'lib/sidekiq/power_fetch/recover.rb', line 23

def lock
  @lock.lock
end

#requeue_job(queue, msg, conn) ⇒ Object

If you want this method to be run in a scope of multi connection you need to pass it



53
54
55
56
57
58
59
60
61
# File 'lib/sidekiq/power_fetch/recover.rb', line 53

def requeue_job(queue, msg, conn)
  with_connection(conn) do |conn|
    conn.lpush(queue, Sidekiq.dump_json(msg))
  end

  @capsule.logger.info(
    "[PowerFetch] Pushed job #{msg["jid"]} back to queue '#{queue}'"
  )
end