Class: Sidekiq::BaseReliableFetch
- Inherits:
-
Object
- Object
- Sidekiq::BaseReliableFetch
- Defined in:
- lib/sidekiq/base_reliable_fetch.rb
Direct Known Subclasses
Defined Under Namespace
Classes: UnitOfWork
Constant Summary collapse
- DEFAULT_CLEANUP_INTERVAL =
1 hour
60 * 60
- HEARTBEAT_INTERVAL =
seconds
20
- HEARTBEAT_LIFESPAN =
seconds
60
- HEARTBEAT_RETRY_DELAY =
seconds
1
- WORKING_QUEUE_PREFIX =
'working'
- DEFAULT_LEASE_INTERVAL =
Defines how often we try to take a lease to not flood our Redis server with SET requests
2 * 60
- LEASE_KEY =
seconds
'reliable-fetcher-cleanup-lock'
- SCAN_COUNT =
Defines the COUNT parameter that will be passed to Redis SCAN command
1000
- DEFAULT_MAX_RETRIES_AFTER_INTERRUPTION =
How much time a job can be interrupted
3
- WORKING_QUEUE_REGEX =
Regexes for matching working queue keys
/#{WORKING_QUEUE_PREFIX}:(queue:.*):([^:]*:[0-9]*:[0-9a-f]*)\z/.freeze
- LEGACY_WORKING_QUEUE_REGEX =
/#{WORKING_QUEUE_PREFIX}:(queue:.*):([^:]*:[0-9]*)\z/.freeze
Instance Attribute Summary collapse
-
#cleanup_interval ⇒ Object
readonly
Returns the value of attribute cleanup_interval.
-
#last_try_to_take_lease_at ⇒ Object
readonly
Returns the value of attribute last_try_to_take_lease_at.
-
#lease_interval ⇒ Object
readonly
Returns the value of attribute lease_interval.
-
#queues ⇒ Object
readonly
Returns the value of attribute queues.
-
#strictly_ordered_queues ⇒ Object
readonly
Returns the value of attribute strictly_ordered_queues.
-
#use_semi_reliable_fetch ⇒ Object
readonly
Returns the value of attribute use_semi_reliable_fetch.
Class Method Summary collapse
- .heartbeat ⇒ Object
- .heartbeat_key(identity) ⇒ Object
- .hostname ⇒ Object
- .identity ⇒ Object
- .process_nonce ⇒ Object
- .setup_reliable_fetch!(config) ⇒ Object
- .start_heartbeat_thread ⇒ Object
- .worker_dead?(identity, conn) ⇒ Boolean
- .working_queue_name(queue) ⇒ Object
Instance Method Summary collapse
- #bulk_requeue(inprogress, _options) ⇒ Object
-
#initialize(options) ⇒ BaseReliableFetch
constructor
A new instance of BaseReliableFetch.
- #retrieve_unit_of_work ⇒ Object
- #retrieve_work ⇒ Object
Constructor Details
#initialize(options) ⇒ BaseReliableFetch
Returns a new instance of BaseReliableFetch.
115 116 117 118 119 120 121 122 123 124 125 |
# File 'lib/sidekiq/base_reliable_fetch.rb', line 115 def initialize() raise ArgumentError, 'missing queue list' unless [:queues] @config = @interrupted_set = Sidekiq::InterruptedSet.new @cleanup_interval = .fetch(:cleanup_interval, DEFAULT_CLEANUP_INTERVAL) @lease_interval = .fetch(:lease_interval, DEFAULT_LEASE_INTERVAL) @last_try_to_take_lease_at = 0 @strictly_ordered_queues = !![:strict] @queues = [:queues].map { |q| "queue:#{q}" } end |
Instance Attribute Details
#cleanup_interval ⇒ Object (readonly)
Returns the value of attribute cleanup_interval.
111 112 113 |
# File 'lib/sidekiq/base_reliable_fetch.rb', line 111 def cleanup_interval @cleanup_interval end |
#last_try_to_take_lease_at ⇒ Object (readonly)
Returns the value of attribute last_try_to_take_lease_at.
111 112 113 |
# File 'lib/sidekiq/base_reliable_fetch.rb', line 111 def last_try_to_take_lease_at @last_try_to_take_lease_at end |
#lease_interval ⇒ Object (readonly)
Returns the value of attribute lease_interval.
111 112 113 |
# File 'lib/sidekiq/base_reliable_fetch.rb', line 111 def lease_interval @lease_interval end |
#queues ⇒ Object (readonly)
Returns the value of attribute queues.
111 112 113 |
# File 'lib/sidekiq/base_reliable_fetch.rb', line 111 def queues @queues end |
#strictly_ordered_queues ⇒ Object (readonly)
Returns the value of attribute strictly_ordered_queues.
111 112 113 |
# File 'lib/sidekiq/base_reliable_fetch.rb', line 111 def strictly_ordered_queues @strictly_ordered_queues end |
#use_semi_reliable_fetch ⇒ Object (readonly)
Returns the value of attribute use_semi_reliable_fetch.
111 112 113 |
# File 'lib/sidekiq/base_reliable_fetch.rb', line 111 def use_semi_reliable_fetch @use_semi_reliable_fetch end |
Class Method Details
.heartbeat ⇒ Object
91 92 93 94 95 96 97 |
# File 'lib/sidekiq/base_reliable_fetch.rb', line 91 def self.heartbeat Sidekiq.redis do |conn| conn.set(heartbeat_key(identity), 1, ex: HEARTBEAT_LIFESPAN) end Sidekiq.logger.debug("Heartbeat for #{identity}") end |
.heartbeat_key(identity) ⇒ Object
103 104 105 |
# File 'lib/sidekiq/base_reliable_fetch.rb', line 103 def self.heartbeat_key(identity) "reliable-fetcher-heartbeat-#{identity.gsub(':', '-')}" end |
.hostname ⇒ Object
79 80 81 |
# File 'lib/sidekiq/base_reliable_fetch.rb', line 79 def self.hostname Socket.gethostname end |
.identity ⇒ Object
87 88 89 |
# File 'lib/sidekiq/base_reliable_fetch.rb', line 87 def self.identity @@identity ||= "#{hostname}:#{$$}:#{process_nonce}" end |
.process_nonce ⇒ Object
83 84 85 |
# File 'lib/sidekiq/base_reliable_fetch.rb', line 83 def self.process_nonce @@process_nonce ||= SecureRandom.hex(6) end |
.setup_reliable_fetch!(config) ⇒ Object
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/sidekiq/base_reliable_fetch.rb', line 47 def self.setup_reliable_fetch!(config) config = config. unless config.respond_to?(:[]) fetch_strategy = if config[:semi_reliable_fetch] Sidekiq::SemiReliableFetch else Sidekiq::ReliableFetch end config[:fetch] = fetch_strategy.new(config) Sidekiq.logger.info('GitLab reliable fetch activated!') start_heartbeat_thread end |
.start_heartbeat_thread ⇒ Object
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 |
# File 'lib/sidekiq/base_reliable_fetch.rb', line 63 def self.start_heartbeat_thread Thread.new do loop do begin heartbeat sleep HEARTBEAT_INTERVAL rescue => e Sidekiq.logger.error("Heartbeat thread error: #{e.}") sleep HEARTBEAT_RETRY_DELAY end end end end |
.worker_dead?(identity, conn) ⇒ Boolean
99 100 101 |
# File 'lib/sidekiq/base_reliable_fetch.rb', line 99 def self.worker_dead?(identity, conn) !conn.get(heartbeat_key(identity)) end |
.working_queue_name(queue) ⇒ Object
107 108 109 |
# File 'lib/sidekiq/base_reliable_fetch.rb', line 107 def self.working_queue_name(queue) "#{WORKING_QUEUE_PREFIX}:#{queue}:#{identity}" end |
Instance Method Details
#bulk_requeue(inprogress, _options) ⇒ Object
138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 |
# File 'lib/sidekiq/base_reliable_fetch.rb', line 138 def bulk_requeue(inprogress, ) return if inprogress.empty? Sidekiq.redis do |conn| inprogress.each do |unit_of_work| conn.multi do |multi| preprocess_interrupted_job(unit_of_work.job, unit_of_work.queue, multi) multi.lrem(self.class.working_queue_name(unit_of_work.queue), 1, unit_of_work.job) end end end rescue => e Sidekiq.logger.warn("Failed to requeue #{inprogress.size} jobs: #{e.}") end |
#retrieve_unit_of_work ⇒ Object
133 134 135 136 |
# File 'lib/sidekiq/base_reliable_fetch.rb', line 133 def retrieve_unit_of_work raise NotImplementedError, "#{self.class} does not implement #{__method__}" end |
#retrieve_work ⇒ Object
127 128 129 130 131 |
# File 'lib/sidekiq/base_reliable_fetch.rb', line 127 def retrieve_work clean_working_queues! if take_lease retrieve_unit_of_work end |