Class: SidekiqUniqueJobs::Orphans::RubyReaper

Inherits:
Reaper
  • Object
show all
Includes:
Timing
Defined in:
lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb

Overview

Note:

this is a much slower version of the lua script but does not crash redis

Class DeleteOrphans provides deletion of orphaned digests

rubocop:disable Metrics/ClassLength

Author:

Constant Summary collapse

RUN_SUFFIX =

Returns the suffix for :RUN locks.

Returns:

  • (String)

    the suffix for :RUN locks

":RUN"
MAX_QUEUE_LENGTH =

Returns the maximum combined length of sidekiq queues for running the reaper.

Returns:

  • (Integer)

    the maximum combined length of sidekiq queues for running the reaper

1000

Constants inherited from Reaper

SidekiqUniqueJobs::Orphans::Reaper::REAPERS

Instance Attribute Summary collapse

Attributes inherited from Reaper

#conn

Instance Method Summary collapse

Methods included from Timing

clock_stamp, now_f, time_source, timed

Methods inherited from Reaper

call, #config, #reaper, #reaper_count, #reaper_timeout

Methods included from JSON

dump_json, load_json, safe_load_json

Methods included from Logging

#build_message, included, #log_debug, #log_error, #log_fatal, #log_info, #log_warn, #logger, #logging_context, #with_configured_loggers_context, #with_logging_context

Methods included from Script::Caller

call_script, debug_lua, do_call, extract_args, max_history, now_f, redis_version

Methods included from Connection

included, #redis

Constructor Details

#initialize(conn) ⇒ RubyReaper

Initialize a new instance of DeleteOrphans

Parameters:

  • conn (Redis)

    a connection to redis



50
51
52
53
54
55
56
57
# File 'lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb', line 50

def initialize(conn)
  super(conn)
  @digests    = SidekiqUniqueJobs::Digests.new
  @scheduled  = Redis::SortedSet.new(SCHEDULE)
  @retried    = Redis::SortedSet.new(RETRY)
  @start_time = time_source.call
  @timeout_ms = SidekiqUniqueJobs.config.reaper_timeout * 1000
end

Instance Attribute Details

#digestsObject (readonly)

Returns the value of attribute digests.



25
26
27
# File 'lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb', line 25

def digests
  @digests
end

#retriedObject (readonly)

Returns the value of attribute retried.



33
34
35
# File 'lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb', line 33

def retried
  @retried
end

#scheduledObject (readonly)

Returns the value of attribute scheduled.



29
30
31
# File 'lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb', line 29

def scheduled
  @scheduled
end

#start_timeObject (readonly)

Returns the value of attribute start_time.



38
39
40
# File 'lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb', line 38

def start_time
  @start_time
end

#timeout_msObject (readonly)

Returns the value of attribute timeout_ms.



43
44
45
# File 'lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb', line 43

def timeout_ms
  @timeout_ms
end

Instance Method Details

#active?(digest) ⇒ Boolean

rubocop:disable Metrics/MethodLength, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity

Returns:

  • (Boolean)


167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
# File 'lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb', line 167

def active?(digest) # rubocop:disable Metrics/MethodLength, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
  Sidekiq.redis do |conn|
    procs = conn.sscan_each("processes").to_a
    return false if procs.empty?

    procs.sort.each do |key|
      valid, workers = conn.pipelined do |pipeline|
        # TODO: Remove the if statement in the future
        if pipeline.respond_to?(:exists?)
          pipeline.exists?(key)
        else
          pipeline.exists(key)
        end
        pipeline.hgetall("#{key}:workers")
      end

      next unless valid
      next unless workers.any?

      workers.each_pair do |_tid, job|
        next unless (item = safe_load_json(job))

        payload = safe_load_json(item[PAYLOAD])

        return true if match?(digest, payload[LOCK_DIGEST])
        return true if considered_active?(payload[CREATED_AT])
      end
    end

    false
  end
end

#belongs_to_job?(digest) ⇒ true, false

Checks if the digest has a matching job.

1. It checks the scheduled set
2. It checks the retry set
3. It goes through all queues

Parameters:

  • digest (String)

    the digest to search for

Returns:

  • (true)

    when either of the checks return true

  • (false)

    when no job was found for this digest



122
123
124
# File 'lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb', line 122

def belongs_to_job?(digest)
  scheduled?(digest) || retried?(digest) || enqueued?(digest) || active?(digest)
end

#callInteger

Delete orphaned digests

Returns:

  • (Integer)

    the number of reaped locks



65
66
67
68
69
# File 'lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb', line 65

def call
  return if queues_very_full?

  BatchDelete.call(orphans, conn)
end

#considered_active?(time_f) ⇒ Boolean

Returns:

  • (Boolean)


206
207
208
# File 'lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb', line 206

def considered_active?(time_f)
  (Time.now - reaper_timeout).to_f < time_f
end

#elapsed_msObject



106
107
108
# File 'lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb', line 106

def elapsed_ms
  time_source.call - start_time
end

#enqueued?(digest) ⇒ true

Checks if the digest exists in a Sidekiq::Queue

Parameters:

  • digest (String)

    the current digest

Returns:

  • (true)

    when digest exists in any queue



155
156
157
158
159
160
161
162
163
164
165
# File 'lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb', line 155

def enqueued?(digest)
  Sidekiq.redis do |conn|
    queues(conn) do |queue|
      entries(conn, queue) do |entry|
        return true if entry.include?(digest)
      end
    end

    false
  end
end

#entries(conn, queue, &block) ⇒ Object

rubocop:disable Metrics/MethodLength



223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
# File 'lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb', line 223

def entries(conn, queue, &block) # rubocop:disable Metrics/MethodLength
  queue_key    = "queue:#{queue}"
  initial_size = conn.llen(queue_key)
  deleted_size = 0
  page         = 0
  page_size    = 50

  loop do
    range_start = (page * page_size) - deleted_size
    range_end   = range_start + page_size - 1
    entries     = conn.lrange(queue_key, range_start, range_end)
    page       += 1

    break if entries.empty?

    entries.each(&block)

    deleted_size = initial_size - conn.llen(queue_key)
  end
end

#in_sorted_set?(key, digest) ⇒ true, false

Checks a sorted set for the existance of this digest

Parameters:

  • key (String)

    the key for the sorted set

  • digest (String)

    the digest to scan for

Returns:

  • (true)

    when found

  • (false)

    when missing



270
271
272
# File 'lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb', line 270

def in_sorted_set?(key, digest)
  conn.zscan_each(key, match: "*#{digest}*", count: 1).to_a.any?
end

#match?(key_one, key_two) ⇒ Boolean

Returns:

  • (Boolean)


200
201
202
203
204
# File 'lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb', line 200

def match?(key_one, key_two)
  return false if key_one.nil? || key_two.nil?

  key_one.delete_suffix(RUN_SUFFIX) == key_two.delete_suffix(RUN_SUFFIX)
end

#orphansArray<String>

Find orphaned digests

Returns:

  • (Array<String>)

    an array of orphaned digests



77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb', line 77

def orphans # rubocop:disable Metrics/MethodLength, Metrics/CyclomaticComplexity
  page = 0
  per = reaper_count * 2
  orphans = []
  results = conn.zrange(digests.key, page * per, (page + 1) * per)

  while results.size.positive?
    results.each do |digest|
      break if timeout?
      next if belongs_to_job?(digest)

      orphans << digest
      break if orphans.size >= reaper_count
    end

    break if timeout?
    break if orphans.size >= reaper_count

    page += 1
    results = conn.zrange(digests.key, page * per, (page + 1) * per)
  end

  orphans
end

#queues(conn) { ... } ⇒ void

This method returns an undefined value.

Loops through all the redis queues and yields them one by one

Parameters:

  • conn (Redis)

    the connection to use for fetching queues

Yields:

  • queues one at a time



219
220
221
# File 'lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb', line 219

def queues(conn, &block)
  conn.sscan_each("queues", &block)
end

#queues_very_full?Boolean

If sidekiq queues are very full, it becomes highly inefficient for the reaper because it must check every queued job to verify a digest is safe to delete The reaper checks queued jobs in batches of 50, adding 2 reads per digest With a queue length of 1,000 jobs, that’s over 20 extra reads per digest.

Returns:

  • (Boolean)


248
249
250
251
252
253
254
255
256
257
258
# File 'lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb', line 248

def queues_very_full?
  total_queue_size = 0
  Sidekiq.redis do |conn|
    queues(conn) do |queue|
      total_queue_size += conn.llen("queue:#{queue}")

      return true if total_queue_size > MAX_QUEUE_LENGTH
    end
  end
  false
end

#retried?(digest) ⇒ true

Checks if the digest exists in the Sidekiq::RetrySet

Parameters:

  • digest (String)

    the current digest

Returns:

  • (true)

    when digest exists in retry set



144
145
146
# File 'lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb', line 144

def retried?(digest)
  in_sorted_set?(RETRY, digest)
end

#scheduled?(digest) ⇒ true

Checks if the digest exists in the Sidekiq::ScheduledSet

Parameters:

  • digest (String)

    the current digest

Returns:

  • (true)

    when digest exists in scheduled set



133
134
135
# File 'lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb', line 133

def scheduled?(digest)
  in_sorted_set?(SCHEDULE, digest)
end

#timeout?Boolean

Returns:

  • (Boolean)


102
103
104
# File 'lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb', line 102

def timeout?
  elapsed_ms >= timeout_ms
end