Class: Gitlab::BackgroundMigration::JobCoordinator
- Inherits:
-
Object
- Object
- Gitlab::BackgroundMigration::JobCoordinator
- Defined in:
- lib/gitlab/background_migration/job_coordinator.rb
Overview
Class responsible for executing background migrations based on the given database.
Chooses the correct worker class when selecting jobs from the queue based on the convention of how the queues and worker classes are setup for each database.
Also provides a database connection to the correct tracking database.
Instance Attribute Summary collapse
-
#worker_class ⇒ Object
readonly
Returns the value of attribute worker_class.
Class Method Summary collapse
Instance Method Summary collapse
- #dead_jobs?(migration_class) ⇒ Boolean
- #enqueued_job?(queues, migration_class) ⇒ Boolean
- #exists?(migration_class, additional_queues = []) ⇒ Boolean
- #migration_class_for(class_name) ⇒ Object
- #migration_instance_for(class_name) ⇒ Object
- #pending_jobs(include_dead_jobs: false) ⇒ Object
- #perform(class_name, arguments) ⇒ Object
- #queue ⇒ Object
- #remaining ⇒ Object
- #retrying_jobs?(migration_class) ⇒ Boolean
- #steal(steal_class, retry_dead_jobs: false) ⇒ Object
- #with_shared_connection(&block) ⇒ Object
Instance Attribute Details
#worker_class ⇒ Object (readonly)
Returns the value of attribute worker_class.
41 42 43 |
# File 'lib/gitlab/background_migration/job_coordinator.rb', line 41 def worker_class @worker_class end |
Class Method Details
.for_tracking_database(tracking_database) ⇒ Object
13 14 15 16 17 18 19 20 21 |
# File 'lib/gitlab/background_migration/job_coordinator.rb', line 13 def for_tracking_database(tracking_database) worker_class = worker_for_tracking_database[tracking_database] if worker_class.nil? raise ArgumentError, "The '#{tracking_database}' must be one of #{worker_for_tracking_database.keys.to_a}" end new(worker_class) end |
Instance Method Details
#dead_jobs?(migration_class) ⇒ Boolean
117 118 119 120 121 |
# File 'lib/gitlab/background_migration/job_coordinator.rb', line 117 def dead_jobs?(migration_class) dead_set = Sidekiq::DeadSet.new enqueued_job?([dead_set], migration_class) end |
#enqueued_job?(queues, migration_class) ⇒ Boolean
143 144 145 146 147 148 149 |
# File 'lib/gitlab/background_migration/job_coordinator.rb', line 143 def enqueued_job?(queues, migration_class) queues.any? do |queue| queue.any? do |job| job.klass == worker_class.name && job.args.first == migration_class end end end |
#exists?(migration_class, additional_queues = []) ⇒ Boolean
110 111 112 113 114 115 |
# File 'lib/gitlab/background_migration/job_coordinator.rb', line 110 def exists?(migration_class, additional_queues = []) enqueued = Sidekiq::Queue.new(self.queue) scheduled = Sidekiq::ScheduledSet.new enqueued_job?([enqueued, scheduled], migration_class) end |
#migration_class_for(class_name) ⇒ Object
139 140 141 |
# File 'lib/gitlab/background_migration/job_coordinator.rb', line 139 def migration_class_for(class_name) Gitlab::BackgroundMigration.const_get(class_name, false) end |
#migration_instance_for(class_name) ⇒ Object
129 130 131 132 133 134 135 136 137 |
# File 'lib/gitlab/background_migration/job_coordinator.rb', line 129 def migration_instance_for(class_name) migration_class = migration_class_for(class_name) if migration_class < Gitlab::BackgroundMigration::BaseJob migration_class.new(connection: connection) else migration_class.new end end |
#pending_jobs(include_dead_jobs: false) ⇒ Object
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 |
# File 'lib/gitlab/background_migration/job_coordinator.rb', line 53 def pending_jobs(include_dead_jobs: false) Enumerator.new do |y| queues = [ Sidekiq::ScheduledSet.new, Sidekiq::Queue.new(self.queue) ] if include_dead_jobs queues << Sidekiq::RetrySet.new queues << Sidekiq::DeadSet.new end queues.each do |queue| queue.each do |job| y << job if job.klass == worker_class.name end end end end |
#perform(class_name, arguments) ⇒ Object
93 94 95 96 97 |
# File 'lib/gitlab/background_migration/job_coordinator.rb', line 93 def perform(class_name, arguments) with_shared_connection do migration_instance_for(class_name).perform(*arguments) end end |
#queue ⇒ Object
45 46 47 |
# File 'lib/gitlab/background_migration/job_coordinator.rb', line 45 def queue @queue ||= worker_class.['queue'] end |
#remaining ⇒ Object
99 100 101 102 103 104 105 106 107 108 |
# File 'lib/gitlab/background_migration/job_coordinator.rb', line 99 def remaining enqueued = Sidekiq::Queue.new(self.queue) scheduled = Sidekiq::ScheduledSet.new [enqueued, scheduled].sum do |set| set.count do |job| job.klass == worker_class.name end end end |
#retrying_jobs?(migration_class) ⇒ Boolean
123 124 125 126 127 |
# File 'lib/gitlab/background_migration/job_coordinator.rb', line 123 def (migration_class) retry_set = Sidekiq::RetrySet.new enqueued_job?([retry_set], migration_class) end |
#steal(steal_class, retry_dead_jobs: false) ⇒ Object
73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 |
# File 'lib/gitlab/background_migration/job_coordinator.rb', line 73 def steal(steal_class, retry_dead_jobs: false) with_shared_connection do pending_jobs(include_dead_jobs: retry_dead_jobs).each do |job| migration_class, migration_args = job.args next unless migration_class == steal_class next if block_given? && !(yield job) begin perform(migration_class, migration_args) if job.delete rescue Exception # rubocop:disable Lint/RescueException worker_class # enqueue this migration again .perform_async(migration_class, migration_args) raise end end end end |
#with_shared_connection(&block) ⇒ Object
49 50 51 |
# File 'lib/gitlab/background_migration/job_coordinator.rb', line 49 def with_shared_connection(&block) Gitlab::Database::SharedModel.using_connection(connection, &block) end |