Class: Sidekiq::PowerFetch::Recover
- Inherits:
-
Object
- Object
- Sidekiq::PowerFetch::Recover
- Defined in:
- lib/sidekiq/power_fetch/recover.rb
Constant Summary collapse
- SCAN_COUNT =
Defines the COUNT parameter that will be passed to Redis SCAN command
1000
- RECOVERIES =
How much time a job can be interrupted
3
- WORKING_QUEUE_REGEX =
Regexes for matching working queue keys
/\A#{WORKING_QUEUE_PREFIX}:(queue:.*):([^:]*:[0-9]*:[0-9a-f]*)\z/
Instance Method Summary collapse
-
#call ⇒ Object
Detect “old” jobs and requeue them because the worker they were assigned to probably failed miserably.
-
#initialize(capsule) ⇒ Recover
constructor
A new instance of Recover.
- #lock ⇒ Object
-
#requeue_job(queue, msg, conn) ⇒ Object
If you want this method to be run in a scope of multi connection you need to pass it.
Constructor Details
#initialize(capsule) ⇒ Recover
Returns a new instance of Recover.
17 18 19 20 21 |
# File 'lib/sidekiq/power_fetch/recover.rb', line 17 def initialize(capsule) @capsule = capsule @lock = Lock.new(@capsule) @recoveries = @capsule.config[:power_fetch_recoveries] || RECOVERIES end |
Instance Method Details
#call ⇒ Object
Detect “old” jobs and requeue them because the worker they were assigned to probably failed miserably.
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/sidekiq/power_fetch/recover.rb', line 29 def call @capsule.logger.info("[PowerFetch] Recovering working queues") @capsule.redis do |conn| conn.scan( match: "#{WORKING_QUEUE_PREFIX}:queue:*", count: SCAN_COUNT ) do |key| # Identity format is "{hostname}:{pid}:{randomhex} # Queue names may also have colons (namespaced). # Expressing this in a single regex is unreadable original_queue, identity = key.scan(WORKING_QUEUE_REGEX).flatten next if original_queue.nil? || identity.nil? if worker_dead?(identity, conn) recover_working_queue!(original_queue, key) end end end end |
#lock ⇒ Object
23 24 25 |
# File 'lib/sidekiq/power_fetch/recover.rb', line 23 def lock @lock.lock end |
#requeue_job(queue, msg, conn) ⇒ Object
If you want this method to be run in a scope of multi connection you need to pass it
53 54 55 56 57 58 59 60 61 |
# File 'lib/sidekiq/power_fetch/recover.rb', line 53 def requeue_job(queue, msg, conn) with_connection(conn) do |conn| conn.lpush(queue, Sidekiq.dump_json(msg)) end @capsule.logger.info( "[PowerFetch] Pushed job #{msg["jid"]} back to queue '#{queue}'" ) end |