Class: Sidekiq::BaseReliableFetch
- Inherits:
-
Object
- Object
- Sidekiq::BaseReliableFetch
- Defined in:
- lib/sidekiq/base_reliable_fetch.rb
Direct Known Subclasses
Defined Under Namespace
Classes: UnitOfWork
Constant Summary collapse
- DEFAULT_CLEANUP_INTERVAL =
1 hour
60 * 60
- HEARTBEAT_INTERVAL =
seconds
20
- HEARTBEAT_LIFESPAN =
seconds
60
- HEARTBEAT_RETRY_DELAY =
seconds
1
- WORKING_QUEUE_PREFIX =
'working'
- DEFAULT_LEASE_INTERVAL =
Defines how often we try to take a lease to not flood our Redis server with SET requests
2 * 60
- LEASE_KEY =
seconds
'reliable-fetcher-cleanup-lock'
- SCAN_COUNT =
Defines the COUNT parameter that will be passed to Redis SCAN command
1000
- DEFAULT_MAX_RETRIES_AFTER_INTERRUPTION =
How much time a job can be interrupted
3
- WORKING_QUEUE_REGEX =
Regexes for matching working queue keys
/#{WORKING_QUEUE_PREFIX}:(queue:.*):([^:]*:[0-9]*:[0-9a-f]*)\z/.freeze
- LEGACY_WORKING_QUEUE_REGEX =
/#{WORKING_QUEUE_PREFIX}:(queue:.*):([^:]*:[0-9]*)\z/.freeze
Instance Attribute Summary collapse
-
#cleanup_interval ⇒ Object
readonly
Returns the value of attribute cleanup_interval.
-
#last_try_to_take_lease_at ⇒ Object
readonly
Returns the value of attribute last_try_to_take_lease_at.
-
#lease_interval ⇒ Object
readonly
Returns the value of attribute lease_interval.
-
#queues ⇒ Object
readonly
Returns the value of attribute queues.
-
#strictly_ordered_queues ⇒ Object
readonly
Returns the value of attribute strictly_ordered_queues.
-
#use_semi_reliable_fetch ⇒ Object
readonly
Returns the value of attribute use_semi_reliable_fetch.
Class Method Summary collapse
- .heartbeat ⇒ Object
- .heartbeat_key(identity) ⇒ Object
- .hostname ⇒ Object
- .identity ⇒ Object
- .process_nonce ⇒ Object
- .setup_reliable_fetch!(config) ⇒ Object
- .start_heartbeat_thread ⇒ Object
- .worker_dead?(identity, conn) ⇒ Boolean
- .working_queue_name(queue) ⇒ Object
Instance Method Summary collapse
- #bulk_requeue(inprogress, _options) ⇒ Object
-
#initialize(options) ⇒ BaseReliableFetch
constructor
A new instance of BaseReliableFetch.
- #retrieve_unit_of_work ⇒ Object
- #retrieve_work ⇒ Object
Constructor Details
#initialize(options) ⇒ BaseReliableFetch
Returns a new instance of BaseReliableFetch.
113 114 115 116 117 118 119 120 121 |
# File 'lib/sidekiq/base_reliable_fetch.rb', line 113 def initialize() raise ArgumentError, 'missing queue list' unless [:queues] @cleanup_interval = .fetch(:cleanup_interval, DEFAULT_CLEANUP_INTERVAL) @lease_interval = .fetch(:lease_interval, DEFAULT_LEASE_INTERVAL) @last_try_to_take_lease_at = 0 @strictly_ordered_queues = !![:strict] @queues = [:queues].map { |q| "queue:#{q}" } end |
Instance Attribute Details
#cleanup_interval ⇒ Object (readonly)
Returns the value of attribute cleanup_interval.
109 110 111 |
# File 'lib/sidekiq/base_reliable_fetch.rb', line 109 def cleanup_interval @cleanup_interval end |
#last_try_to_take_lease_at ⇒ Object (readonly)
Returns the value of attribute last_try_to_take_lease_at.
109 110 111 |
# File 'lib/sidekiq/base_reliable_fetch.rb', line 109 def last_try_to_take_lease_at @last_try_to_take_lease_at end |
#lease_interval ⇒ Object (readonly)
Returns the value of attribute lease_interval.
109 110 111 |
# File 'lib/sidekiq/base_reliable_fetch.rb', line 109 def lease_interval @lease_interval end |
#queues ⇒ Object (readonly)
Returns the value of attribute queues.
109 110 111 |
# File 'lib/sidekiq/base_reliable_fetch.rb', line 109 def queues @queues end |
#strictly_ordered_queues ⇒ Object (readonly)
Returns the value of attribute strictly_ordered_queues.
109 110 111 |
# File 'lib/sidekiq/base_reliable_fetch.rb', line 109 def strictly_ordered_queues @strictly_ordered_queues end |
#use_semi_reliable_fetch ⇒ Object (readonly)
Returns the value of attribute use_semi_reliable_fetch.
109 110 111 |
# File 'lib/sidekiq/base_reliable_fetch.rb', line 109 def use_semi_reliable_fetch @use_semi_reliable_fetch end |
Class Method Details
.heartbeat ⇒ Object
89 90 91 92 93 94 95 |
# File 'lib/sidekiq/base_reliable_fetch.rb', line 89 def self.heartbeat Sidekiq.redis do |conn| conn.set(heartbeat_key(identity), 1, ex: HEARTBEAT_LIFESPAN) end Sidekiq.logger.debug("Heartbeat for #{identity}") end |
.heartbeat_key(identity) ⇒ Object
101 102 103 |
# File 'lib/sidekiq/base_reliable_fetch.rb', line 101 def self.heartbeat_key(identity) "reliable-fetcher-heartbeat-#{identity.gsub(':', '-')}" end |
.hostname ⇒ Object
77 78 79 |
# File 'lib/sidekiq/base_reliable_fetch.rb', line 77 def self.hostname Socket.gethostname end |
.identity ⇒ Object
85 86 87 |
# File 'lib/sidekiq/base_reliable_fetch.rb', line 85 def self.identity @@identity ||= "#{hostname}:#{$$}:#{process_nonce}" end |
.process_nonce ⇒ Object
81 82 83 |
# File 'lib/sidekiq/base_reliable_fetch.rb', line 81 def self.process_nonce @@process_nonce ||= SecureRandom.hex(6) end |
.setup_reliable_fetch!(config) ⇒ Object
47 48 49 50 51 52 53 54 55 56 57 58 59 |
# File 'lib/sidekiq/base_reliable_fetch.rb', line 47 def self.setup_reliable_fetch!(config) fetch_strategy = if config.[:semi_reliable_fetch] Sidekiq::SemiReliableFetch else Sidekiq::ReliableFetch end config.[:fetch] = fetch_strategy.new(config.) Sidekiq.logger.info('GitLab reliable fetch activated!') start_heartbeat_thread end |
.start_heartbeat_thread ⇒ Object
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 |
# File 'lib/sidekiq/base_reliable_fetch.rb', line 61 def self.start_heartbeat_thread Thread.new do loop do begin heartbeat sleep HEARTBEAT_INTERVAL rescue => e Sidekiq.logger.error("Heartbeat thread error: #{e.}") sleep HEARTBEAT_RETRY_DELAY end end end end |
.worker_dead?(identity, conn) ⇒ Boolean
97 98 99 |
# File 'lib/sidekiq/base_reliable_fetch.rb', line 97 def self.worker_dead?(identity, conn) !conn.get(heartbeat_key(identity)) end |
.working_queue_name(queue) ⇒ Object
105 106 107 |
# File 'lib/sidekiq/base_reliable_fetch.rb', line 105 def self.working_queue_name(queue) "#{WORKING_QUEUE_PREFIX}:#{queue}:#{identity}" end |
Instance Method Details
#bulk_requeue(inprogress, _options) ⇒ Object
134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 |
# File 'lib/sidekiq/base_reliable_fetch.rb', line 134 def bulk_requeue(inprogress, ) return if inprogress.empty? Sidekiq.redis do |conn| inprogress.each do |unit_of_work| conn.multi do |multi| preprocess_interrupted_job(unit_of_work.job, unit_of_work.queue, multi) multi.lrem(self.class.working_queue_name(unit_of_work.queue), 1, unit_of_work.job) end end end rescue => e Sidekiq.logger.warn("Failed to requeue #{inprogress.size} jobs: #{e.}") end |
#retrieve_unit_of_work ⇒ Object
129 130 131 132 |
# File 'lib/sidekiq/base_reliable_fetch.rb', line 129 def retrieve_unit_of_work raise NotImplementedError, "#{self.class} does not implement #{__method__}" end |
#retrieve_work ⇒ Object
123 124 125 126 127 |
# File 'lib/sidekiq/base_reliable_fetch.rb', line 123 def retrieve_work clean_working_queues! if take_lease retrieve_unit_of_work end |