Class: DataMigration::Task

Inherits:
ActiveRecord::Base
  • Object
show all
Defined in:
lib/data_migration/task.rb

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.job_classObject



48
49
50
# File 'lib/data_migration/task.rb', line 48

def self.job_class
  DataMigration.config.job_class
end

.listObject



78
79
80
# File 'lib/data_migration/task.rb', line 78

def self.list
  Dir[DataMigration.config.data_migrations_path_glob].map { |f| File.basename(f, ".*") }
end

.perform_later(name, **kwargs) ⇒ Object



61
62
63
# File 'lib/data_migration/task.rb', line 61

def self.perform_later(name, **kwargs)
  create!(name:).perform_later(**kwargs)
end

.perform_now(name, **kwargs) ⇒ Object



52
53
54
# File 'lib/data_migration/task.rb', line 52

def self.perform_now(name, **kwargs)
  create!(name:).perform_now(**kwargs)
end

.prepare(name, pause_minutes: nil, jobs_limit: nil) ⇒ Object



70
71
72
# File 'lib/data_migration/task.rb', line 70

def self.prepare(name, pause_minutes: nil, jobs_limit: nil)
  create!(name:, pause_minutes:, jobs_limit:)
end

.root_pathObject



74
75
76
# File 'lib/data_migration/task.rb', line 74

def self.root_path
  DataMigration.config.data_migrations_full_path
end

Instance Method Details

#file_exists?Boolean

Returns:

  • (Boolean)


86
87
88
# File 'lib/data_migration/task.rb', line 86

def file_exists?
  self.class.list.include?(name)
end

#file_pathObject



82
83
84
# File 'lib/data_migration/task.rb', line 82

def file_path
  "#{self.class.root_path}/#{name}.rb"
end

#job_check_in!(job_id, job_args: [], job_kwargs: {}) ⇒ Object



94
95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/data_migration/task.rb', line 94

def job_check_in!(job_id, job_args: [], job_kwargs: {})
  self.current_jobs ||= {}

  raise DataMigration::JobConflictError, "#{user_title} already has job ##{job_id}" if current_jobs.key?(job_id)
  raise DataMigration::JobConcurrencyLimitError, "#{user_title} reached limit of #{jobs_limit} jobs" if jobs_limit.present? && current_jobs.size >= jobs_limit

  self.current_jobs[job_id] = {
    ts: Time.current,
    args: job_args,
    kwargs: job_kwargs
  }
  save!
end

#job_check_out!(job_id) ⇒ Object



108
109
110
111
# File 'lib/data_migration/task.rb', line 108

def job_check_out!(job_id)
  self.current_jobs.delete(job_id)
  save!
end

#not_started?Boolean

Returns:

  • (Boolean)


90
91
92
# File 'lib/data_migration/task.rb', line 90

def not_started?
  status.nil? && started_at.nil?
end

#perform_later(**perform_args) ⇒ Object



65
66
67
68
# File 'lib/data_migration/task.rb', line 65

def perform_later(**perform_args)
  update!(kwargs: perform_args)
  self.class.job_class.perform_later(id, **perform_args)
end

#perform_now(**perform_args) ⇒ Object



56
57
58
59
# File 'lib/data_migration/task.rb', line 56

def perform_now(**perform_args)
  update!(kwargs: perform_args)
  self.class.job_class.perform_now(id, **perform_args)
end

#requires_pause?Boolean

Returns:

  • (Boolean)


117
118
119
# File 'lib/data_migration/task.rb', line 117

def requires_pause?
  pause_minutes.positive? && !paused?
end

#user_titleObject



113
114
115
# File 'lib/data_migration/task.rb', line 113

def user_title
  "Data migration ##{id} #{name}"
end