Class: Bulkrax::ImportWorkJob

Inherits:
ApplicationJob show all
Defined in:
app/jobs/bulkrax/import_work_job.rb

Instance Method Summary collapse

Instance Method Details

#perform(entry_id, run_id, time_to_live = 3) ⇒ Object

Note:

Yes, we are calling ImporterRun.find each time. these were on purpose to prevent race conditions on the database update. If you do not re-find (or at least reload) the object on each increment, the count can get messed up. Let’s say there are two jobs A and B and a counter set to 2.

  • A grabs the importer_run (line 10)

  • B grabs the importer_run (line 10)

  • A Finishes the build, does the increment (now the counter is 3)

  • B Finishes the build, does the increment (now the counter is 3 again) and thus a count is lost.

rubocop:disable Rails/SkipsModelValidations



22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'app/jobs/bulkrax/import_work_job.rb', line 22

def perform(entry_id, run_id, time_to_live = 3, *)
  entry = Entry.find(entry_id)
  entry.build
  if entry.status == "Complete"
    ImporterRun.increment_counter(:processed_records, run_id)
    ImporterRun.increment_counter(:processed_works, run_id)
  else
    # do not retry here because whatever parse error kept you from creating a work will likely
    # keep preventing you from doing so.
    ImporterRun.increment_counter(:failed_records, run_id)
    ImporterRun.increment_counter(:failed_works, run_id)
  end
  # Regardless of completion or not, we want to decrement the enqueued records.
  ImporterRun.decrement_counter(:enqueued_records, run_id) unless ImporterRun.find(run_id).enqueued_records <= 0

  entry.save!
  entry.importer.current_run = ImporterRun.find(run_id)
  entry.importer.record_status
rescue Bulkrax::CollectionsCreatedError => e
  Rails.logger.warn("#{self.class} entry_id: #{entry_id}, run_id: #{run_id} encountered #{e.class}: #{e.message}")
  # You get 3 attempts at the above perform before we have the import exception cascade into
  # the Sidekiq retry ecosystem.
  # rubocop:disable Style/IfUnlessModifier
  if time_to_live <= 1
    raise "Exhauted reschedule limit for #{self.class} entry_id: #{entry_id}, run_id: #{run_id}.  Attemping retries"
  end
  # rubocop:enable Style/IfUnlessModifier
  reschedule(entry_id, run_id, time_to_live)
end

#reschedule(entry_id, run_id, time_to_live) ⇒ Object

rubocop:enable Rails/SkipsModelValidations



53
54
55
# File 'app/jobs/bulkrax/import_work_job.rb', line 53

def reschedule(entry_id, run_id, time_to_live)
  ImportWorkJob.set(wait: 1.minute).perform_later(entry_id, run_id, time_to_live - 1)
end