Class: Gitlab::BackgroundMigration::JobCoordinator

Inherits:
Object
  • Object
show all
Defined in:
lib/gitlab/background_migration/job_coordinator.rb

Overview

Class responsible for executing background migrations based on the given database.

Chooses the correct worker class when selecting jobs from the queue based on the convention of how the queues and worker classes are setup for each database.

Also provides a database connection to the correct tracking database.

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#worker_classObject (readonly)

Returns the value of attribute worker_class.



41
42
43
# File 'lib/gitlab/background_migration/job_coordinator.rb', line 41

def worker_class
  @worker_class
end

Class Method Details

.for_tracking_database(tracking_database) ⇒ Object



13
14
15
16
17
18
19
20
21
# File 'lib/gitlab/background_migration/job_coordinator.rb', line 13

def for_tracking_database(tracking_database)
  worker_class = worker_for_tracking_database[tracking_database]

  if worker_class.nil?
    raise ArgumentError, "The '#{tracking_database}' must be one of #{worker_for_tracking_database.keys.to_a}"
  end

  new(worker_class)
end

Instance Method Details

#dead_jobs?(migration_class) ⇒ Boolean

Returns:

  • (Boolean)


117
118
119
120
121
# File 'lib/gitlab/background_migration/job_coordinator.rb', line 117

def dead_jobs?(migration_class)
  dead_set = Sidekiq::DeadSet.new

  enqueued_job?([dead_set], migration_class)
end

#enqueued_job?(queues, migration_class) ⇒ Boolean

Returns:

  • (Boolean)


143
144
145
146
147
148
149
# File 'lib/gitlab/background_migration/job_coordinator.rb', line 143

def enqueued_job?(queues, migration_class)
  queues.any? do |queue|
    queue.any? do |job|
      job.klass == worker_class.name && job.args.first == migration_class
    end
  end
end

#exists?(migration_class, additional_queues = []) ⇒ Boolean

Returns:

  • (Boolean)


110
111
112
113
114
115
# File 'lib/gitlab/background_migration/job_coordinator.rb', line 110

def exists?(migration_class, additional_queues = [])
  enqueued = Sidekiq::Queue.new(self.queue)
  scheduled = Sidekiq::ScheduledSet.new

  enqueued_job?([enqueued, scheduled], migration_class)
end

#migration_class_for(class_name) ⇒ Object



139
140
141
# File 'lib/gitlab/background_migration/job_coordinator.rb', line 139

def migration_class_for(class_name)
  Gitlab::BackgroundMigration.const_get(class_name, false)
end

#migration_instance_for(class_name) ⇒ Object



129
130
131
132
133
134
135
136
137
# File 'lib/gitlab/background_migration/job_coordinator.rb', line 129

def migration_instance_for(class_name)
  migration_class = migration_class_for(class_name)

  if migration_class < Gitlab::BackgroundMigration::BaseJob
    migration_class.new(connection: connection)
  else
    migration_class.new
  end
end

#pending_jobs(include_dead_jobs: false) ⇒ Object



53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/gitlab/background_migration/job_coordinator.rb', line 53

def pending_jobs(include_dead_jobs: false)
  Enumerator.new do |y|
    queues = [
      Sidekiq::ScheduledSet.new,
      Sidekiq::Queue.new(self.queue)
    ]

    if include_dead_jobs
      queues << Sidekiq::RetrySet.new
      queues << Sidekiq::DeadSet.new
    end

    queues.each do |queue|
      queue.each do |job|
        y << job if job.klass == worker_class.name
      end
    end
  end
end

#perform(class_name, arguments) ⇒ Object



93
94
95
96
97
# File 'lib/gitlab/background_migration/job_coordinator.rb', line 93

def perform(class_name, arguments)
  with_shared_connection do
    migration_instance_for(class_name).perform(*arguments)
  end
end

#queueObject



45
46
47
# File 'lib/gitlab/background_migration/job_coordinator.rb', line 45

def queue
  @queue ||= worker_class.sidekiq_options['queue']
end

#remainingObject



99
100
101
102
103
104
105
106
107
108
# File 'lib/gitlab/background_migration/job_coordinator.rb', line 99

def remaining
  enqueued = Sidekiq::Queue.new(self.queue)
  scheduled = Sidekiq::ScheduledSet.new

  [enqueued, scheduled].sum do |set|
    set.count do |job|
      job.klass == worker_class.name
    end
  end
end

#retrying_jobs?(migration_class) ⇒ Boolean

Returns:

  • (Boolean)


123
124
125
126
127
# File 'lib/gitlab/background_migration/job_coordinator.rb', line 123

def retrying_jobs?(migration_class)
  retry_set = Sidekiq::RetrySet.new

  enqueued_job?([retry_set], migration_class)
end

#steal(steal_class, retry_dead_jobs: false) ⇒ Object



73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# File 'lib/gitlab/background_migration/job_coordinator.rb', line 73

def steal(steal_class, retry_dead_jobs: false)
  with_shared_connection do
    pending_jobs(include_dead_jobs: retry_dead_jobs).each do |job|
      migration_class, migration_args = job.args

      next unless migration_class == steal_class
      next if block_given? && !(yield job)

      begin
        perform(migration_class, migration_args) if job.delete
      rescue Exception # rubocop:disable Lint/RescueException
        worker_class # enqueue this migration again
          .perform_async(migration_class, migration_args)

        raise
      end
    end
  end
end

#with_shared_connection(&block) ⇒ Object



49
50
51
# File 'lib/gitlab/background_migration/job_coordinator.rb', line 49

def with_shared_connection(&block)
  Gitlab::Database::SharedModel.using_connection(connection, &block)
end