Class: SidekiqUniqueJobs::Orphans::RubyReaper

Inherits:
Reaper
  • Object
show all
Includes:
Timing
Defined in:
lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb

Overview

Note:

this is a much slower version of the lua script but does not crash redis

Class DeleteOrphans provides deletion of orphaned digests

Author:

Constant Summary collapse

SIDEKIQ_BEAT_PAUSE =
10
RUN_SUFFIX =
":RUN"
MAX_QUEUE_LENGTH =
1000

Constants inherited from Reaper

SidekiqUniqueJobs::Orphans::Reaper::REAPERS

Instance Attribute Summary collapse

Attributes inherited from Reaper

#conn

Instance Method Summary collapse

Methods included from Timing

clock_stamp, now_f, time_source, timed

Methods inherited from Reaper

call, #config, #reaper, #reaper_count, #reaper_timeout

Methods included from JSON

dump_json, load_json, safe_load_json

Methods included from Logging

#build_message, included, #log_debug, #log_error, #log_fatal, #log_info, #log_warn, #logger, #logging_context, #with_configured_loggers_context, #with_logging_context

Methods included from Script::Caller

call_script, debug_lua, do_call, extract_args, max_history, normalize_argv, now_f, redis_version

Methods included from Connection

included, #redis

Constructor Details

#initialize(conn) ⇒ RubyReaper

Initialize a new instance of DeleteOrphans



58
59
60
61
62
63
64
65
66
# File 'lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb', line 58

def initialize(conn)
  super
  @digests      = SidekiqUniqueJobs::Digests.new
  @scheduled    = Redis::SortedSet.new(SCHEDULE)
  @retried      = Redis::SortedSet.new(RETRY)
  @start_time   = Time.now
  @start_source = time_source.call
  @timeout_ms   = SidekiqUniqueJobs.config.reaper_timeout * 1000
end

Instance Attribute Details

#digestsObject (readonly)

Returns the value of attribute digests.



27
28
29
# File 'lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb', line 27

def digests
  @digests
end

#retriedObject (readonly)

Returns the value of attribute retried.



35
36
37
# File 'lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb', line 35

def retried
  @retried
end

#scheduledObject (readonly)

Returns the value of attribute scheduled.



31
32
33
# File 'lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb', line 31

def scheduled
  @scheduled
end

#start_sourceObject (readonly)

Returns the value of attribute start_source.



46
47
48
# File 'lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb', line 46

def start_source
  @start_source
end

#start_timeInteger (readonly)



40
41
42
# File 'lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb', line 40

def start_time
  @start_time
end

#timeout_msObject (readonly)

Returns the value of attribute timeout_ms.



51
52
53
# File 'lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb', line 51

def timeout_ms
  @timeout_ms
end

Instance Method Details

#active?(digest) ⇒ Boolean



199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
# File 'lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb', line 199

def active?(digest)
  Sidekiq.redis do |conn|
    procs = conn.sscan("processes").to_a
    return false if procs.empty?

    procs.sort.each do |key|
      valid, workers = conn.pipelined do |pipeline|
        # TODO: Remove the if statement in the future
        if pipeline.respond_to?(:exists?)
          pipeline.exists?(key)
        else
          pipeline.exists(key)
        end
        pipeline.hgetall("#{key}:work")
      end

      next unless valid
      next unless workers.any?

      workers.each_pair do |_tid, job|
        next unless (item = safe_load_json(job))

        payload = safe_load_json(item[PAYLOAD])

        return true if match?(digest, payload[LOCK_DIGEST])
        return true if considered_active?(payload[CREATED_AT])
      end
    end

    false
  end
end

#belongs_to_job?(digest) ⇒ true, false

Checks if the digest has a matching job.

1. It checks the scheduled set
2. It checks the retry set
3. It goes through all queues


154
155
156
# File 'lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb', line 154

def belongs_to_job?(digest)
  scheduled?(digest) || retried?(digest) || enqueued?(digest) || active?(digest)
end

#callInteger

Delete orphaned digests



74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
# File 'lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb', line 74

def call
  return if queues_very_full?

  BatchDelete.call(expired_digests, conn)
  BatchDelete.call(orphans, conn)

  # orphans.each_slice(500) do |chunk|
  #   conn.pipelined do |pipeline|
  #     chunk.each do |digest|
  #       next if belongs_to_job?(digest)

  #       pipeline.zadd(ORPHANED_DIGESTS, now_f, digest)
  #     end
  #   end
  # end
end

#considered_active?(time_f) ⇒ Boolean



238
239
240
# File 'lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb', line 238

def considered_active?(time_f)
  max_score < time_f
end

#elapsed_msObject



138
139
140
# File 'lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb', line 138

def elapsed_ms
  time_source.call - start_source
end

#enqueued?(digest) ⇒ true

Checks if the digest exists in a Sidekiq::Queue



187
188
189
190
191
192
193
194
195
196
197
# File 'lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb', line 187

def enqueued?(digest)
  Sidekiq.redis do |conn|
    queues(conn) do |queue|
      entries(conn, queue) do |entry|
        return true if entry.include?(digest)
      end
    end

    false
  end
end

#entries(conn, queue, &block) ⇒ Object



255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
# File 'lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb', line 255

def entries(conn, queue, &block)
  queue_key    = "queue:#{queue}"
  initial_size = conn.llen(queue_key)
  deleted_size = 0
  page         = 0
  page_size    = 50

  loop do
    range_start = (page * page_size) - deleted_size

    range_end   = range_start + page_size - 1
    entries     = conn.lrange(queue_key, range_start, range_end)
    page       += 1

    break if entries.empty?

    entries.each(&block)

    deleted_size = initial_size - conn.llen(queue_key)

    # The queue is growing, not shrinking, just keep looping
    deleted_size = 0 if deleted_size.negative?
  end
end

#expired_digestsObject



91
92
93
# File 'lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb', line 91

def expired_digests
  conn.zrange(EXPIRING_DIGESTS, 0, max_score, "byscore")
end

#in_sorted_set?(key, digest) ⇒ true, false

Checks a sorted set for the existance of this digest



306
307
308
# File 'lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb', line 306

def in_sorted_set?(key, digest)
  conn.zscan(key, match: "*#{digest}*", count: 1).to_a.any?
end

#match?(key_one, key_two) ⇒ Boolean



232
233
234
235
236
# File 'lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb', line 232

def match?(key_one, key_two)
  return false if key_one.nil? || key_two.nil?

  key_one.delete_suffix(RUN_SUFFIX) == key_two.delete_suffix(RUN_SUFFIX)
end

#max_scoreObject



99
100
101
# File 'lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb', line 99

def max_score
  (start_time - reaper_timeout - SIDEKIQ_BEAT_PAUSE).to_f
end

#orphaned_digestsObject



95
96
97
# File 'lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb', line 95

def orphaned_digests
  conn.zrange(ORPHANED_DIGESTS, 0, max_score, "byscore")
end

#orphansArray<String>

Find orphaned digests



109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# File 'lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb', line 109

def orphans
  orphans = []
  page    = 0
  per     = reaper_count * 2
  results = digests.byscore(0, max_score, offset: page * per, count: (page + 1) * per)

  while results.size.positive?
    results.each do |digest|
      break if timeout?
      next if belongs_to_job?(digest)

      orphans << digest
      break if orphans.size >= reaper_count
    end

    break if timeout?
    break if orphans.size >= reaper_count

    page += 1
    results = digests.byscore(0, max_score, offset: page * per, count: (page + 1) * per)
  end

  orphans
end

#queues(conn) { ... } ⇒ void

This method returns an undefined value.

Loops through all the redis queues and yields them one by one

Yields:

  • queues one at a time



251
252
253
# File 'lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb', line 251

def queues(conn, &block)
  conn.sscan("queues").each(&block)
end

#queues_very_full?Boolean

If sidekiq queues are very full, it becomes highly inefficient for the reaper because it must check every queued job to verify a digest is safe to delete The reaper checks queued jobs in batches of 50, adding 2 reads per digest With a queue length of 1,000 jobs, that’s over 20 extra reads per digest.



284
285
286
287
288
289
290
291
292
293
294
# File 'lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb', line 284

def queues_very_full?
  total_queue_size = 0
  Sidekiq.redis do |conn|
    queues(conn) do |queue|
      total_queue_size += conn.llen("queue:#{queue}")

      return true if total_queue_size > MAX_QUEUE_LENGTH
    end
  end
  false
end

#retried?(digest) ⇒ true

Checks if the digest exists in the Sidekiq::RetrySet



176
177
178
# File 'lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb', line 176

def retried?(digest)
  in_sorted_set?(RETRY, digest)
end

#scheduled?(digest) ⇒ true

Checks if the digest exists in the Sidekiq::ScheduledSet



165
166
167
# File 'lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb', line 165

def scheduled?(digest)
  in_sorted_set?(SCHEDULE, digest)
end

#timeout?Boolean



134
135
136
# File 'lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb', line 134

def timeout?
  elapsed_ms >= timeout_ms
end