Module: Gitlab::Database::Migrations::SidekiqHelpers

Included in:
Gitlab::Database::MigrationHelpers
Defined in:
lib/gitlab/database/migrations/sidekiq_helpers.rb

Overview

rubocop:disable Cop/SidekiqApiUsage rubocop:disable Cop/SidekiqRedisCall

Constant Summary collapse

DEFAULT_MAX_ATTEMPTS =

Constants for default sidekiq_remove_jobs values

5
DEFAULT_TIMES_IN_A_ROW =
2

Instance Method Summary collapse

Instance Method Details

#migrate_across_instance(queue_from, to, src_store, dst_stores) ⇒ Object

cross instance transfers are not atomic and data loss is possible



90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# File 'lib/gitlab/database/migrations/sidekiq_helpers.rb', line 90

def migrate_across_instance(queue_from, to, src_store, dst_stores)
  _, src_pool = Gitlab::SidekiqSharding::Router.get_shard_instance(src_store)
  buffer_queue_name = "migration_buffer:queue:#{queue_from}"

  while Sidekiq::Client.via(src_pool) { sidekiq_queue_length(queue_from) } > 0
    job = Sidekiq::Client.via(src_pool) do
      Sidekiq.redis do |c|
        c.rpoplpush("queue:#{queue_from}", buffer_queue_name)
      end
    end
    job_hash = Sidekiq.load_json(job)

    # In the case of multiple stores having the same queue name, we look up the store which the particular job
    # would have been enqueued to if `.perform_async` were called.
    dst_store_name = Gitlab::SidekiqConfig::WorkerRouter.global.store(job_hash["class"].safe_constantize)

    # Send the job to the first shard that contains the queue. This assumes that the queue has a listener
    # on that particular Redis instance. This only affects configurations which use multiple shards per queue.
    store_name = dst_stores.find { |ds| dst_store_name == ds } || dst_stores.first
    _, pool = Gitlab::SidekiqSharding::Router.get_shard_instance(store_name)
    Sidekiq::Client.via(pool) { Sidekiq.redis { |c| c.lpush("queue:#{to}", job) } }
  end

  Sidekiq::Client.via(src_pool) { Sidekiq.redis { |c| c.unlink(buffer_queue_name) } }
end

#migrate_within_instance(queue_from, to) ⇒ Object



116
117
118
119
120
# File 'lib/gitlab/database/migrations/sidekiq_helpers.rb', line 116

def migrate_within_instance(queue_from, to)
  Sidekiq.redis do |conn|
    conn.rpoplpush "queue:#{queue_from}", "queue:#{to}" while sidekiq_queue_length(queue_from) > 0
  end
end

#sidekiq_queue_migrate(queue_from, to:) ⇒ Object



73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/gitlab/database/migrations/sidekiq_helpers.rb', line 73

def sidekiq_queue_migrate(queue_from, to:)
  src_stores = Gitlab::SidekiqConfig::WorkerRouter.global.stores_with_queue(queue_from)
  dst_stores = Gitlab::SidekiqConfig::WorkerRouter.global.stores_with_queue(to)

  if migrate_within_instance?(src_stores, dst_stores) || !Gitlab::SidekiqSharding::Router.enabled?
    migrate_within_instance(queue_from, to)
  else
    src_stores = [nil] if src_stores.empty? # route from main shard if empty
    dst_stores = [nil] if dst_stores.empty? # route to main shard if empty

    src_stores.each do |src_store|
      migrate_across_instance(queue_from, to, src_store, dst_stores)
    end
  end
end

#sidekiq_remove_jobs(job_klasses:, times_in_a_row: DEFAULT_TIMES_IN_A_ROW, max_attempts: DEFAULT_MAX_ATTEMPTS) ⇒ Object

Probabilistically removes job_klasses from their specific queues, the retry set and the scheduled set.

If jobs are still being processed at the same time, then there is a small chance it will not remove all instances of job_klass. To minimize this risk, it repeatedly removes matching jobs from each until nothing is removed twice in a row.

Before calling this method, you should make sure that job_klass is no longer being scheduled within the running application.



23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/gitlab/database/migrations/sidekiq_helpers.rb', line 23

def sidekiq_remove_jobs(
  job_klasses:,
  times_in_a_row: DEFAULT_TIMES_IN_A_ROW,
  max_attempts: DEFAULT_MAX_ATTEMPTS
)
  kwargs = { times_in_a_row: times_in_a_row, max_attempts: max_attempts }

  if transaction_open?
    raise 'sidekiq_remove_jobs can not be run inside a transaction, ' \
          'you can disable transactions by calling disable_ddl_transaction! ' \
          'in the body of your migration class'
  end

  job_klasses_queues = job_klasses
    .map { |job_klass| job_klass.to_s.safe_constantize }
    .select(&:present?)
    .map { |job_klass| [job_klass.queue, job_klass.get_sidekiq_options['store']] }
    .uniq

  job_klasses_queues.each do |queue, store|
    _, pool = Gitlab::SidekiqSharding::Router.get_shard_instance(store)
    Sidekiq::Client.via(pool) do
      delete_jobs_for(
        set: Sidekiq::Queue.new(queue),
        job_klasses: job_klasses,
        kwargs: kwargs
      )
    end
  end

  results = job_klasses_queues.map(&:last).uniq.map do |store|
    _, pool = Gitlab::SidekiqSharding::Router.get_shard_instance(store)
    Sidekiq::Client.via(pool) do
      delete_jobs_for(
        set: Sidekiq::RetrySet.new,
        kwargs: kwargs,
        job_klasses: job_klasses
      )

      delete_jobs_for(
        set: Sidekiq::ScheduledSet.new,
        kwargs: kwargs,
        job_klasses: job_klasses
      )
    end
  end

  aggregate_results(results)
end