Class: Sidekiq::BaseReliableFetch

Inherits:
Object
  • Object
show all
Defined in:
lib/sidekiq/base_reliable_fetch.rb

Direct Known Subclasses

ReliableFetch, SemiReliableFetch

Defined Under Namespace

Classes: UnitOfWork

Constant Summary collapse

DEFAULT_CLEANUP_INTERVAL =

1 hour

60 * 60
HEARTBEAT_INTERVAL =

seconds

20
HEARTBEAT_LIFESPAN =

seconds

60
HEARTBEAT_RETRY_DELAY =

seconds

1
WORKING_QUEUE_PREFIX =
'working'
DEFAULT_LEASE_INTERVAL =

Defines how often we try to take a lease to not flood our Redis server with SET requests

2 * 60
LEASE_KEY =

seconds

'reliable-fetcher-cleanup-lock'
SCAN_COUNT =

Defines the COUNT parameter that will be passed to Redis SCAN command

1000

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options) ⇒ BaseReliableFetch

Returns a new instance of BaseReliableFetch.



113
114
115
116
117
118
119
# File 'lib/sidekiq/base_reliable_fetch.rb', line 113

def initialize(options)
  @cleanup_interval = options.fetch(:cleanup_interval, DEFAULT_CLEANUP_INTERVAL)
  @lease_interval = options.fetch(:lease_interval, DEFAULT_LEASE_INTERVAL)
  @last_try_to_take_lease_at = 0
  @strictly_ordered_queues = !!options[:strict]
  @queues = options[:queues].map { |q| "queue:#{q}" }
end

Instance Attribute Details

#cleanup_intervalObject (readonly)

Returns the value of attribute cleanup_interval.



109
110
111
# File 'lib/sidekiq/base_reliable_fetch.rb', line 109

def cleanup_interval
  @cleanup_interval
end

#last_try_to_take_lease_atObject (readonly)

Returns the value of attribute last_try_to_take_lease_at.



109
110
111
# File 'lib/sidekiq/base_reliable_fetch.rb', line 109

def last_try_to_take_lease_at
  @last_try_to_take_lease_at
end

#lease_intervalObject (readonly)

Returns the value of attribute lease_interval.



109
110
111
# File 'lib/sidekiq/base_reliable_fetch.rb', line 109

def lease_interval
  @lease_interval
end

#queuesObject (readonly)

Returns the value of attribute queues.



109
110
111
# File 'lib/sidekiq/base_reliable_fetch.rb', line 109

def queues
  @queues
end

#strictly_ordered_queuesObject (readonly)

Returns the value of attribute strictly_ordered_queues.



109
110
111
# File 'lib/sidekiq/base_reliable_fetch.rb', line 109

def strictly_ordered_queues
  @strictly_ordered_queues
end

#use_semi_reliable_fetchObject (readonly)

Returns the value of attribute use_semi_reliable_fetch.



109
110
111
# File 'lib/sidekiq/base_reliable_fetch.rb', line 109

def use_semi_reliable_fetch
  @use_semi_reliable_fetch
end

Class Method Details

.bulk_requeue(inprogress, _options) ⇒ Object



82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# File 'lib/sidekiq/base_reliable_fetch.rb', line 82

def self.bulk_requeue(inprogress, _options)
  return if inprogress.empty?

  Sidekiq.logger.debug('Re-queueing terminated jobs')

  Sidekiq.redis do |conn|
    inprogress.each do |unit_of_work|
      conn.multi do |multi|
        multi.lpush(unit_of_work.queue, unit_of_work.job)
        multi.lrem(working_queue_name(unit_of_work.queue), 1, unit_of_work.job)
      end
    end
  end

  Sidekiq.logger.info("Pushed #{inprogress.size} jobs back to Redis")
rescue => e
  Sidekiq.logger.warn("Failed to requeue #{inprogress.size} jobs: #{e.message}")
end

.heartbeatObject



74
75
76
77
78
79
80
# File 'lib/sidekiq/base_reliable_fetch.rb', line 74

def self.heartbeat
  Sidekiq.redis do |conn|
    conn.set(heartbeat_key(hostname, pid), 1, ex: HEARTBEAT_LIFESPAN)
  end

  Sidekiq.logger.debug("Heartbeat for hostname: #{hostname} and pid: #{pid}")
end

.heartbeat_key(hostname, pid) ⇒ Object



101
102
103
# File 'lib/sidekiq/base_reliable_fetch.rb', line 101

def self.heartbeat_key(hostname, pid)
  "reliable-fetcher-heartbeat-#{hostname}-#{pid}"
end

.hostnameObject



70
71
72
# File 'lib/sidekiq/base_reliable_fetch.rb', line 70

def self.hostname
  @hostname ||= Socket.gethostname
end

.pidObject



66
67
68
# File 'lib/sidekiq/base_reliable_fetch.rb', line 66

def self.pid
  @pid ||= ::Process.pid
end

.setup_reliable_fetch!(config) ⇒ Object



38
39
40
41
42
43
44
45
46
47
48
# File 'lib/sidekiq/base_reliable_fetch.rb', line 38

def self.setup_reliable_fetch!(config)
  config.options[:fetch] = if config.options[:semi_reliable_fetch]
                             Sidekiq::SemiReliableFetch
                           else
                             Sidekiq::ReliableFetch
                           end

  Sidekiq.logger.info('GitLab reliable fetch activated!')

  start_heartbeat_thread
end

.start_heartbeat_threadObject



50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/sidekiq/base_reliable_fetch.rb', line 50

def self.start_heartbeat_thread
  Thread.new do
    loop do
      begin
        heartbeat

        sleep HEARTBEAT_INTERVAL
      rescue => e
        Sidekiq.logger.error("Heartbeat thread error: #{e.message}")

        sleep HEARTBEAT_RETRY_DELAY
      end
    end
  end
end

.working_queue_name(queue) ⇒ Object



105
106
107
# File 'lib/sidekiq/base_reliable_fetch.rb', line 105

def self.working_queue_name(queue)
  "#{WORKING_QUEUE_PREFIX}:#{queue}:#{hostname}:#{pid}"
end

Instance Method Details

#retrieve_unit_of_workObject

Raises:

  • (NotImplementedError)


127
128
129
130
# File 'lib/sidekiq/base_reliable_fetch.rb', line 127

def retrieve_unit_of_work
  raise NotImplementedError,
    "#{self.class} does not implement #{__method__}"
end

#retrieve_workObject



121
122
123
124
125
# File 'lib/sidekiq/base_reliable_fetch.rb', line 121

def retrieve_work
  clean_working_queues! if take_lease

  retrieve_unit_of_work
end