Class: OnlineMigrations::BackgroundDataMigrations::MigrationJob

Inherits:
Object
  • Object
show all
Includes:
Sidekiq::IterableJob
Defined in:
lib/online_migrations/background_data_migrations/migration_job.rb

Overview

Sidekiq job responsible for running background data migrations.

Constant Summary collapse

TICKER_INTERVAL =

seconds

5

Instance Method Summary collapse

Constructor Details

#initializeMigrationJob

Returns a new instance of MigrationJob.



30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/online_migrations/background_data_migrations/migration_job.rb', line 30

def initialize
  super

  @migration = nil
  @data_migration = nil

  @ticker = Ticker.new(TICKER_INTERVAL) do |ticks, duration|
    # TODO: use 'cursor' accessor from sidekiq in the future.
    # https://github.com/sidekiq/sidekiq/pull/6606
    @migration.persist_progress(@_cursor, ticks, duration)

    # When using a scheduler, these are running only from a single shard, but when inline -
    # these are run from each shard (not needed, but simplifies the implementation).
    #
    # Do not reload the migration when running inline, because it can be from a different shard
    # than the "default" shard (which is used to lookup background migrations).
    if !Utils.run_background_migrations_inline?
      # Reload to check if the migration's status changed etc.
      @migration.reload
    end
  end

  @throttle_checked_at = current_time
end

Instance Method Details

#build_enumerator(migration_id, cursor:) ⇒ Object



75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
# File 'lib/online_migrations/background_data_migrations/migration_job.rb', line 75

def build_enumerator(migration_id, cursor:)
  @migration = BackgroundDataMigrations::Migration.find(migration_id)
  cursor ||= @migration.cursor

  @migration.on_shard_if_present do
    @data_migration = @migration.data_migration
    collection_enum = @data_migration.build_enumerator(cursor: cursor)

    if collection_enum
      if !collection_enum.is_a?(Enumerator)
        raise ArgumentError, "          \#{@data_migration.class.name}#build_enumerator must return an Enumerator,\n          got \#{collection_enum.class.name}.\n        MSG\n      end\n\n      collection_enum\n    else\n      collection = @data_migration.collection\n\n      case collection\n      when ActiveRecord::Relation\n        options = {\n          cursor: cursor,\n          batch_size: @data_migration.class.active_record_enumerator_batch_size || 100,\n        }\n        active_record_records_enumerator(collection, **options)\n      when ActiveRecord::Batches::BatchEnumerator\n        if collection.start || collection.finish\n          raise ArgumentError, <<~MSG.squish\n            \#{@data_migration.class.name}#collection does not support\n            a batch enumerator with the \"start\" or \"finish\" options.\n          MSG\n        end\n\n        active_record_relations_enumerator(\n          collection.relation,\n          batch_size: collection.batch_size,\n          cursor: cursor,\n          use_ranges: collection.use_ranges\n        )\n      when Array\n        array_enumerator(collection, cursor: cursor)\n      else\n        raise ArgumentError, <<~MSG.squish\n          \#{@data_migration.class.name}#collection must be either an ActiveRecord::Relation,\n          ActiveRecord::Batches::BatchEnumerator, or Array.\n        MSG\n      end\n    end\n  end\nend\n".squish

#each_iteration(item, _migration_id) ⇒ Object



128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
# File 'lib/online_migrations/background_data_migrations/migration_job.rb', line 128

def each_iteration(item, _migration_id)
  if @migration.cancelling? || @migration.pausing? || @migration.paused?
    # Finish this exact sidekiq job. When the migration is paused
    # and will be resumed, a new job will be enqueued.
    finished = true
    throw :abort, finished
  elsif should_throttle?
    ActiveSupport::Notifications.instrument("throttled.background_data_migrations", migration: @migration)
    finished = false
    throw :abort, finished
  else
    @data_migration.around_process do
      @migration.data_migration.process(item)

      # Migration is refreshed regularly by ticker.
      pause = @migration.iteration_pause
      sleep(pause) if pause > 0
    end
    @ticker.tick
  end
end

#on_completeObject



68
69
70
71
72
73
# File 'lib/online_migrations/background_data_migrations/migration_job.rb', line 68

def on_complete
  # Job was manually cancelled.
  @migration.cancel if cancelled?

  @migration.complete
end

#on_resumeObject



59
60
61
# File 'lib/online_migrations/background_data_migrations/migration_job.rb', line 59

def on_resume
  @data_migration.after_resume
end

#on_startObject



55
56
57
# File 'lib/online_migrations/background_data_migrations/migration_job.rb', line 55

def on_start
  @migration.start
end

#on_stopObject



63
64
65
66
# File 'lib/online_migrations/background_data_migrations/migration_job.rb', line 63

def on_stop
  @ticker.persist
  @migration.stop
end