Class: Sidekiq::BaseReliableFetch
- Inherits:
-
Object
- Object
- Sidekiq::BaseReliableFetch
- Defined in:
- lib/sidekiq/base_reliable_fetch.rb
Direct Known Subclasses
Defined Under Namespace
Classes: UnitOfWork
Constant Summary collapse
- DEFAULT_CLEANUP_INTERVAL =
1 hour
60 * 60
- HEARTBEAT_INTERVAL =
seconds
20
- HEARTBEAT_LIFESPAN =
seconds
60
- HEARTBEAT_RETRY_DELAY =
seconds
1
- WORKING_QUEUE_PREFIX =
'working'
- DEFAULT_LEASE_INTERVAL =
Defines how often we try to take a lease to not flood our Redis server with SET requests
2 * 60
- LEASE_KEY =
seconds
'reliable-fetcher-cleanup-lock'
- SCAN_COUNT =
Defines the COUNT parameter that will be passed to Redis SCAN command
1000
Instance Attribute Summary collapse
-
#cleanup_interval ⇒ Object
readonly
Returns the value of attribute cleanup_interval.
-
#last_try_to_take_lease_at ⇒ Object
readonly
Returns the value of attribute last_try_to_take_lease_at.
-
#lease_interval ⇒ Object
readonly
Returns the value of attribute lease_interval.
-
#queues ⇒ Object
readonly
Returns the value of attribute queues.
-
#strictly_ordered_queues ⇒ Object
readonly
Returns the value of attribute strictly_ordered_queues.
-
#use_semi_reliable_fetch ⇒ Object
readonly
Returns the value of attribute use_semi_reliable_fetch.
Class Method Summary collapse
- .bulk_requeue(inprogress, _options) ⇒ Object
- .heartbeat ⇒ Object
- .heartbeat_key(hostname, pid) ⇒ Object
- .hostname ⇒ Object
- .pid ⇒ Object
- .setup_reliable_fetch!(config) ⇒ Object
- .start_heartbeat_thread ⇒ Object
- .working_queue_name(queue) ⇒ Object
Instance Method Summary collapse
-
#initialize(options) ⇒ BaseReliableFetch
constructor
A new instance of BaseReliableFetch.
- #retrieve_unit_of_work ⇒ Object
- #retrieve_work ⇒ Object
Constructor Details
#initialize(options) ⇒ BaseReliableFetch
Returns a new instance of BaseReliableFetch.
113 114 115 116 117 118 119 |
# File 'lib/sidekiq/base_reliable_fetch.rb', line 113 def initialize() @cleanup_interval = .fetch(:cleanup_interval, DEFAULT_CLEANUP_INTERVAL) @lease_interval = .fetch(:lease_interval, DEFAULT_LEASE_INTERVAL) @last_try_to_take_lease_at = 0 @strictly_ordered_queues = !![:strict] @queues = [:queues].map { |q| "queue:#{q}" } end |
Instance Attribute Details
#cleanup_interval ⇒ Object (readonly)
Returns the value of attribute cleanup_interval.
109 110 111 |
# File 'lib/sidekiq/base_reliable_fetch.rb', line 109 def cleanup_interval @cleanup_interval end |
#last_try_to_take_lease_at ⇒ Object (readonly)
Returns the value of attribute last_try_to_take_lease_at.
109 110 111 |
# File 'lib/sidekiq/base_reliable_fetch.rb', line 109 def last_try_to_take_lease_at @last_try_to_take_lease_at end |
#lease_interval ⇒ Object (readonly)
Returns the value of attribute lease_interval.
109 110 111 |
# File 'lib/sidekiq/base_reliable_fetch.rb', line 109 def lease_interval @lease_interval end |
#queues ⇒ Object (readonly)
Returns the value of attribute queues.
109 110 111 |
# File 'lib/sidekiq/base_reliable_fetch.rb', line 109 def queues @queues end |
#strictly_ordered_queues ⇒ Object (readonly)
Returns the value of attribute strictly_ordered_queues.
109 110 111 |
# File 'lib/sidekiq/base_reliable_fetch.rb', line 109 def strictly_ordered_queues @strictly_ordered_queues end |
#use_semi_reliable_fetch ⇒ Object (readonly)
Returns the value of attribute use_semi_reliable_fetch.
109 110 111 |
# File 'lib/sidekiq/base_reliable_fetch.rb', line 109 def use_semi_reliable_fetch @use_semi_reliable_fetch end |
Class Method Details
.bulk_requeue(inprogress, _options) ⇒ Object
82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 |
# File 'lib/sidekiq/base_reliable_fetch.rb', line 82 def self.bulk_requeue(inprogress, ) return if inprogress.empty? Sidekiq.logger.debug('Re-queueing terminated jobs') Sidekiq.redis do |conn| inprogress.each do |unit_of_work| conn.multi do |multi| multi.lpush(unit_of_work.queue, unit_of_work.job) multi.lrem(working_queue_name(unit_of_work.queue), 1, unit_of_work.job) end end end Sidekiq.logger.info("Pushed #{inprogress.size} jobs back to Redis") rescue => e Sidekiq.logger.warn("Failed to requeue #{inprogress.size} jobs: #{e.}") end |
.heartbeat ⇒ Object
74 75 76 77 78 79 80 |
# File 'lib/sidekiq/base_reliable_fetch.rb', line 74 def self.heartbeat Sidekiq.redis do |conn| conn.set(heartbeat_key(hostname, pid), 1, ex: HEARTBEAT_LIFESPAN) end Sidekiq.logger.debug("Heartbeat for hostname: #{hostname} and pid: #{pid}") end |
.heartbeat_key(hostname, pid) ⇒ Object
101 102 103 |
# File 'lib/sidekiq/base_reliable_fetch.rb', line 101 def self.heartbeat_key(hostname, pid) "reliable-fetcher-heartbeat-#{hostname}-#{pid}" end |
.hostname ⇒ Object
70 71 72 |
# File 'lib/sidekiq/base_reliable_fetch.rb', line 70 def self.hostname @hostname ||= Socket.gethostname end |
.pid ⇒ Object
66 67 68 |
# File 'lib/sidekiq/base_reliable_fetch.rb', line 66 def self.pid @pid ||= ::Process.pid end |
.setup_reliable_fetch!(config) ⇒ Object
38 39 40 41 42 43 44 45 46 47 48 |
# File 'lib/sidekiq/base_reliable_fetch.rb', line 38 def self.setup_reliable_fetch!(config) config.[:fetch] = if config.[:semi_reliable_fetch] Sidekiq::SemiReliableFetch else Sidekiq::ReliableFetch end Sidekiq.logger.info('GitLab reliable fetch activated!') start_heartbeat_thread end |
.start_heartbeat_thread ⇒ Object
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/sidekiq/base_reliable_fetch.rb', line 50 def self.start_heartbeat_thread Thread.new do loop do begin heartbeat sleep HEARTBEAT_INTERVAL rescue => e Sidekiq.logger.error("Heartbeat thread error: #{e.}") sleep HEARTBEAT_RETRY_DELAY end end end end |
.working_queue_name(queue) ⇒ Object
105 106 107 |
# File 'lib/sidekiq/base_reliable_fetch.rb', line 105 def self.working_queue_name(queue) "#{WORKING_QUEUE_PREFIX}:#{queue}:#{hostname}:#{pid}" end |
Instance Method Details
#retrieve_unit_of_work ⇒ Object
127 128 129 130 |
# File 'lib/sidekiq/base_reliable_fetch.rb', line 127 def retrieve_unit_of_work raise NotImplementedError, "#{self.class} does not implement #{__method__}" end |
#retrieve_work ⇒ Object
121 122 123 124 125 |
# File 'lib/sidekiq/base_reliable_fetch.rb', line 121 def retrieve_work clean_working_queues! if take_lease retrieve_unit_of_work end |