Module: BulkImports::Pipeline::Runner

Extended by:
ActiveSupport::Concern
Included in:
BulkImports::Pipeline
Defined in:
lib/bulk_imports/pipeline/runner.rb

Constant Summary collapse

MarkedAsFailedError =
Class.new(StandardError)

Instance Method Summary collapse

Instance Method Details

#runObject


10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/bulk_imports/pipeline/runner.rb', line 10

def run
  raise MarkedAsFailedError if context.entity.failed?

  info(message: 'Pipeline started')

  extracted_data = extracted_data_from

  if extracted_data
    extracted_data.each do |entry|
      transformers.each do |transformer|
        entry = run_pipeline_step(:transformer, transformer.class.name) do
          transformer.transform(context, entry)
        end
      end

      run_pipeline_step(:loader, loader.class.name) do
        loader.load(context, entry)
      end
    end

    tracker.update!(
      has_next_page: extracted_data.has_next_page?,
      next_page: extracted_data.next_page
    )

    run_pipeline_step(:after_run) do
      after_run(extracted_data)
    end
  end

  info(message: 'Pipeline finished')
rescue MarkedAsFailedError
  skip!('Skipping pipeline due to failed entity')
end