Module: Gitlab::Database::Migrations::SidekiqHelpers
- Included in:
- Gitlab::Database::MigrationHelpers
- Defined in:
- lib/gitlab/database/migrations/sidekiq_helpers.rb
Overview
rubocop:disable Cop/SidekiqApiUsage rubocop:disable Cop/SidekiqRedisCall
Constant Summary collapse
- DEFAULT_MAX_ATTEMPTS =
Constants for default sidekiq_remove_jobs values
5
- DEFAULT_TIMES_IN_A_ROW =
2
Instance Method Summary collapse
-
#migrate_across_instance(queue_from, to, src_store, dst_stores) ⇒ Object
cross instance transfers are not atomic and data loss is possible.
- #migrate_within_instance(queue_from, to) ⇒ Object
- #sidekiq_queue_migrate(queue_from, to:) ⇒ Object
-
#sidekiq_remove_jobs(job_klasses:, times_in_a_row: DEFAULT_TIMES_IN_A_ROW, max_attempts: DEFAULT_MAX_ATTEMPTS) ⇒ Object
Probabilistically removes job_klasses from their specific queues, the retry set and the scheduled set.
Instance Method Details
#migrate_across_instance(queue_from, to, src_store, dst_stores) ⇒ Object
cross instance transfers are not atomic and data loss is possible
90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 |
# File 'lib/gitlab/database/migrations/sidekiq_helpers.rb', line 90 def migrate_across_instance(queue_from, to, src_store, dst_stores) _, src_pool = Gitlab::SidekiqSharding::Router.get_shard_instance(src_store) buffer_queue_name = "migration_buffer:queue:#{queue_from}" while Sidekiq::Client.via(src_pool) { sidekiq_queue_length(queue_from) } > 0 job = Sidekiq::Client.via(src_pool) do Sidekiq.redis do |c| c.rpoplpush("queue:#{queue_from}", buffer_queue_name) end end job_hash = Sidekiq.load_json(job) # In the case of multiple stores having the same queue name, we look up the store which the particular job # would have been enqueued to if `.perform_async` were called. dst_store_name = Gitlab::SidekiqConfig::WorkerRouter.global.store(job_hash["class"].safe_constantize) # Send the job to the first shard that contains the queue. This assumes that the queue has a listener # on that particular Redis instance. This only affects configurations which use multiple shards per queue. store_name = dst_stores.find { |ds| dst_store_name == ds } || dst_stores.first _, pool = Gitlab::SidekiqSharding::Router.get_shard_instance(store_name) Sidekiq::Client.via(pool) { Sidekiq.redis { |c| c.lpush("queue:#{to}", job) } } end Sidekiq::Client.via(src_pool) { Sidekiq.redis { |c| c.unlink(buffer_queue_name) } } end |
#migrate_within_instance(queue_from, to) ⇒ Object
116 117 118 119 120 |
# File 'lib/gitlab/database/migrations/sidekiq_helpers.rb', line 116 def migrate_within_instance(queue_from, to) Sidekiq.redis do |conn| conn.rpoplpush "queue:#{queue_from}", "queue:#{to}" while sidekiq_queue_length(queue_from) > 0 end end |
#sidekiq_queue_migrate(queue_from, to:) ⇒ Object
73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 |
# File 'lib/gitlab/database/migrations/sidekiq_helpers.rb', line 73 def sidekiq_queue_migrate(queue_from, to:) src_stores = Gitlab::SidekiqConfig::WorkerRouter.global.stores_with_queue(queue_from) dst_stores = Gitlab::SidekiqConfig::WorkerRouter.global.stores_with_queue(to) if migrate_within_instance?(src_stores, dst_stores) || !Gitlab::SidekiqSharding::Router.enabled? migrate_within_instance(queue_from, to) else src_stores = [nil] if src_stores.empty? # route from main shard if empty dst_stores = [nil] if dst_stores.empty? # route to main shard if empty src_stores.each do |src_store| migrate_across_instance(queue_from, to, src_store, dst_stores) end end end |
#sidekiq_remove_jobs(job_klasses:, times_in_a_row: DEFAULT_TIMES_IN_A_ROW, max_attempts: DEFAULT_MAX_ATTEMPTS) ⇒ Object
Probabilistically removes job_klasses from their specific queues, the retry set and the scheduled set.
If jobs are still being processed at the same time, then there is a small chance it will not remove all instances of job_klass. To minimize this risk, it repeatedly removes matching jobs from each until nothing is removed twice in a row.
Before calling this method, you should make sure that job_klass is no longer being scheduled within the running application.
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 |
# File 'lib/gitlab/database/migrations/sidekiq_helpers.rb', line 23 def sidekiq_remove_jobs( job_klasses:, times_in_a_row: DEFAULT_TIMES_IN_A_ROW, max_attempts: DEFAULT_MAX_ATTEMPTS ) kwargs = { times_in_a_row: times_in_a_row, max_attempts: max_attempts } if transaction_open? raise 'sidekiq_remove_jobs can not be run inside a transaction, ' \ 'you can disable transactions by calling disable_ddl_transaction! ' \ 'in the body of your migration class' end job_klasses_queues = job_klasses .map { |job_klass| job_klass.to_s.safe_constantize } .select(&:present?) .map { |job_klass| [job_klass.queue, job_klass.['store']] } .uniq job_klasses_queues.each do |queue, store| _, pool = Gitlab::SidekiqSharding::Router.get_shard_instance(store) Sidekiq::Client.via(pool) do delete_jobs_for( set: Sidekiq::Queue.new(queue), job_klasses: job_klasses, kwargs: kwargs ) end end results = job_klasses_queues.map(&:last).uniq.map do |store| _, pool = Gitlab::SidekiqSharding::Router.get_shard_instance(store) Sidekiq::Client.via(pool) do delete_jobs_for( set: Sidekiq::RetrySet.new, kwargs: kwargs, job_klasses: job_klasses ) delete_jobs_for( set: Sidekiq::ScheduledSet.new, kwargs: kwargs, job_klasses: job_klasses ) end end aggregate_results(results) end |