Class: Gitlab::SidekiqMigrateJobs
- Inherits:
-
Object
- Object
- Gitlab::SidekiqMigrateJobs
- Defined in:
- lib/gitlab/sidekiq_migrate_jobs.rb
Constant Summary collapse
- LOG_FREQUENCY =
1_000
- LOG_FREQUENCY_QUEUES =
10
Instance Attribute Summary collapse
-
#logger ⇒ Object
readonly
Returns the value of attribute logger.
-
#mappings ⇒ Object
readonly
Returns the value of attribute mappings.
Instance Method Summary collapse
-
#initialize(mappings, logger: nil) ⇒ SidekiqMigrateJobs
constructor
mappings is a hash of WorkerClassName => target_queue_name.
-
#migrate_queues ⇒ Object
Migrates jobs from queues that are outside the mappings.
-
#migrate_set(sidekiq_set) ⇒ Object
Migrate jobs in SortedSets, i.e.
Constructor Details
#initialize(mappings, logger: nil) ⇒ SidekiqMigrateJobs
mappings is a hash of WorkerClassName => target_queue_name
11 12 13 14 |
# File 'lib/gitlab/sidekiq_migrate_jobs.rb', line 11 def initialize(mappings, logger: nil) @mappings = mappings @logger = logger end |
Instance Attribute Details
#logger ⇒ Object (readonly)
Returns the value of attribute logger.
8 9 10 |
# File 'lib/gitlab/sidekiq_migrate_jobs.rb', line 8 def logger @logger end |
#mappings ⇒ Object (readonly)
Returns the value of attribute mappings.
8 9 10 |
# File 'lib/gitlab/sidekiq_migrate_jobs.rb', line 8 def mappings @mappings end |
Instance Method Details
#migrate_queues ⇒ Object
Migrates jobs from queues that are outside the mappings
59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 |
# File 'lib/gitlab/sidekiq_migrate_jobs.rb', line 59 def migrate_queues routing_rules_queues = mappings.values.uniq logger&.info("List of queues based on routing rules: #{routing_rules_queues}") Sidekiq.redis do |conn| # Redis 6 supports conn.scan_each(match: "queue:*", type: 'list') conn.scan_each(match: "queue:*") do |key| # Redis 5 compatibility next unless conn.type(key) == 'list' queue_from = key.split(':', 2).last next if routing_rules_queues.include?(queue_from) logger&.info("Migrating #{queue_from} queue") migrated = 0 while queue_length(queue_from) > 0 begin if migrated >= 0 && migrated % LOG_FREQUENCY_QUEUES == 0 logger&.info("Migrating from #{queue_from}. Total: #{queue_length(queue_from)}. Migrated: #{migrated}.") end job = conn.rpop "queue:#{queue_from}" job_hash = Gitlab::Json.load(job) next unless mappings.has_key?(job_hash['class']) destination_queue = mappings[job_hash['class']] job_hash['queue'] = destination_queue conn.lpush("queue:#{destination_queue}", Gitlab::Json.dump(job_hash)) migrated += 1 rescue JSON::ParserError logger&.error("Unmarshal JSON payload from SidekiqMigrateJobs failed. Job: #{job}") next end end logger&.info("Finished migrating #{queue_from} queue") end end end |
#migrate_set(sidekiq_set) ⇒ Object
Migrate jobs in SortedSets, i.e. scheduled and retry sets.
17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/gitlab/sidekiq_migrate_jobs.rb', line 17 def migrate_set(sidekiq_set) source_queues_regex = Regexp.union(mappings.keys) cursor = 0 scanned = 0 migrated = 0 estimated_size = Sidekiq.redis { |c| c.zcard(sidekiq_set) } logger&.info("Processing #{sidekiq_set} set. Estimated size: #{estimated_size}.") begin cursor, jobs = Sidekiq.redis { |c| c.zscan(sidekiq_set, cursor) } jobs.each do |(job, score)| if scanned > 0 && scanned % LOG_FREQUENCY == 0 logger&.info("In progress. Scanned records: #{scanned}. Migrated records: #{migrated}.") end scanned += 1 next unless job.match?(source_queues_regex) job_hash = Gitlab::Json.load(job) destination_queue = mappings[job_hash['class']] next unless mappings.has_key?(job_hash['class']) next if job_hash['queue'] == destination_queue job_hash['queue'] = destination_queue migrated += migrate_job_in_set(sidekiq_set, job, score, job_hash) end end while cursor.to_i != 0 logger&.info("Done. Scanned records: #{scanned}. Migrated records: #{migrated}.") { scanned: scanned, migrated: migrated } end |