Class: Sidekiq::BaseReliableFetch

Inherits:
Object
  • Object
show all
Defined in:
lib/sidekiq/base_reliable_fetch.rb

Direct Known Subclasses

ReliableFetch, SemiReliableFetch

Defined Under Namespace

Classes: UnitOfWork

Constant Summary collapse

DEFAULT_CLEANUP_INTERVAL =

1 hour

60 * 60
HEARTBEAT_INTERVAL =

seconds

20
HEARTBEAT_LIFESPAN =

seconds

60
HEARTBEAT_RETRY_DELAY =

seconds

1
WORKING_QUEUE_PREFIX =
'working'
DEFAULT_LEASE_INTERVAL =

Defines how often we try to take a lease to not flood our Redis server with SET requests

2 * 60
LEASE_KEY =

seconds

'reliable-fetcher-cleanup-lock'
SCAN_COUNT =

Defines the COUNT parameter that will be passed to Redis SCAN command

1000
DEFAULT_MAX_RETRIES_AFTER_INTERRUPTION =

How much time a job can be interrupted

3
WORKING_QUEUE_REGEX =

Regexes for matching working queue keys

/#{WORKING_QUEUE_PREFIX}:(queue:.*):([^:]*:[0-9]*:[0-9a-f]*)\z/.freeze
LEGACY_WORKING_QUEUE_REGEX =
/#{WORKING_QUEUE_PREFIX}:(queue:.*):([^:]*:[0-9]*)\z/.freeze

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options) ⇒ BaseReliableFetch

Returns a new instance of BaseReliableFetch.

Raises:

  • (ArgumentError)


113
114
115
116
117
118
119
120
121
# File 'lib/sidekiq/base_reliable_fetch.rb', line 113

def initialize(options)
  raise ArgumentError, 'missing queue list' unless options[:queues]

  @cleanup_interval = options.fetch(:cleanup_interval, DEFAULT_CLEANUP_INTERVAL)
  @lease_interval = options.fetch(:lease_interval, DEFAULT_LEASE_INTERVAL)
  @last_try_to_take_lease_at = 0
  @strictly_ordered_queues = !!options[:strict]
  @queues = options[:queues].map { |q| "queue:#{q}" }
end

Instance Attribute Details

#cleanup_intervalObject (readonly)

Returns the value of attribute cleanup_interval.



109
110
111
# File 'lib/sidekiq/base_reliable_fetch.rb', line 109

def cleanup_interval
  @cleanup_interval
end

#last_try_to_take_lease_atObject (readonly)

Returns the value of attribute last_try_to_take_lease_at.



109
110
111
# File 'lib/sidekiq/base_reliable_fetch.rb', line 109

def last_try_to_take_lease_at
  @last_try_to_take_lease_at
end

#lease_intervalObject (readonly)

Returns the value of attribute lease_interval.



109
110
111
# File 'lib/sidekiq/base_reliable_fetch.rb', line 109

def lease_interval
  @lease_interval
end

#queuesObject (readonly)

Returns the value of attribute queues.



109
110
111
# File 'lib/sidekiq/base_reliable_fetch.rb', line 109

def queues
  @queues
end

#strictly_ordered_queuesObject (readonly)

Returns the value of attribute strictly_ordered_queues.



109
110
111
# File 'lib/sidekiq/base_reliable_fetch.rb', line 109

def strictly_ordered_queues
  @strictly_ordered_queues
end

#use_semi_reliable_fetchObject (readonly)

Returns the value of attribute use_semi_reliable_fetch.



109
110
111
# File 'lib/sidekiq/base_reliable_fetch.rb', line 109

def use_semi_reliable_fetch
  @use_semi_reliable_fetch
end

Class Method Details

.heartbeatObject



89
90
91
92
93
94
95
# File 'lib/sidekiq/base_reliable_fetch.rb', line 89

def self.heartbeat
  Sidekiq.redis do |conn|
    conn.set(heartbeat_key(identity), 1, ex: HEARTBEAT_LIFESPAN)
  end

  Sidekiq.logger.debug("Heartbeat for #{identity}")
end

.heartbeat_key(identity) ⇒ Object



101
102
103
# File 'lib/sidekiq/base_reliable_fetch.rb', line 101

def self.heartbeat_key(identity)
  "reliable-fetcher-heartbeat-#{identity.gsub(':', '-')}"
end

.hostnameObject



77
78
79
# File 'lib/sidekiq/base_reliable_fetch.rb', line 77

def self.hostname
  Socket.gethostname
end

.identityObject



85
86
87
# File 'lib/sidekiq/base_reliable_fetch.rb', line 85

def self.identity
  @@identity ||= "#{hostname}:#{$$}:#{process_nonce}"
end

.process_nonceObject



81
82
83
# File 'lib/sidekiq/base_reliable_fetch.rb', line 81

def self.process_nonce
  @@process_nonce ||= SecureRandom.hex(6)
end

.setup_reliable_fetch!(config) ⇒ Object



47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/sidekiq/base_reliable_fetch.rb', line 47

def self.setup_reliable_fetch!(config)
  fetch_strategy = if config.options[:semi_reliable_fetch]
                     Sidekiq::SemiReliableFetch
                   else
                     Sidekiq::ReliableFetch
                   end

  config.options[:fetch] = fetch_strategy.new(config.options)

  Sidekiq.logger.info('GitLab reliable fetch activated!')

  start_heartbeat_thread
end

.start_heartbeat_threadObject



61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/sidekiq/base_reliable_fetch.rb', line 61

def self.start_heartbeat_thread
  Thread.new do
    loop do
      begin
        heartbeat

        sleep HEARTBEAT_INTERVAL
      rescue => e
        Sidekiq.logger.error("Heartbeat thread error: #{e.message}")

        sleep HEARTBEAT_RETRY_DELAY
      end
    end
  end
end

.worker_dead?(identity, conn) ⇒ Boolean

Returns:

  • (Boolean)


97
98
99
# File 'lib/sidekiq/base_reliable_fetch.rb', line 97

def self.worker_dead?(identity, conn)
  !conn.get(heartbeat_key(identity))
end

.working_queue_name(queue) ⇒ Object



105
106
107
# File 'lib/sidekiq/base_reliable_fetch.rb', line 105

def self.working_queue_name(queue)
  "#{WORKING_QUEUE_PREFIX}:#{queue}:#{identity}"
end

Instance Method Details

#bulk_requeue(inprogress, _options) ⇒ Object



134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
# File 'lib/sidekiq/base_reliable_fetch.rb', line 134

def bulk_requeue(inprogress, _options)
  return if inprogress.empty?

  Sidekiq.redis do |conn|
    inprogress.each do |unit_of_work|
      conn.multi do |multi|
        preprocess_interrupted_job(unit_of_work.job, unit_of_work.queue, multi)

        multi.lrem(self.class.working_queue_name(unit_of_work.queue), 1, unit_of_work.job)
      end
    end
  end
rescue => e
  Sidekiq.logger.warn("Failed to requeue #{inprogress.size} jobs: #{e.message}")
end

#retrieve_unit_of_workObject

Raises:

  • (NotImplementedError)


129
130
131
132
# File 'lib/sidekiq/base_reliable_fetch.rb', line 129

def retrieve_unit_of_work
  raise NotImplementedError,
        "#{self.class} does not implement #{__method__}"
end

#retrieve_workObject



123
124
125
126
127
# File 'lib/sidekiq/base_reliable_fetch.rb', line 123

def retrieve_work
  clean_working_queues! if take_lease

  retrieve_unit_of_work
end