10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
|
# File 'lib/bulk_imports/pipeline/runner.rb', line 10
def run
raise MarkedAsFailedError if context.entity.failed?
info(message: 'Pipeline started')
=
if
.each do |entry|
transformers.each do |transformer|
entry = run_pipeline_step(:transformer, transformer.class.name) do
transformer.transform(context, entry)
end
end
run_pipeline_step(:loader, loader.class.name) do
loader.load(context, entry)
end
end
tracker.update!(
has_next_page: .has_next_page?,
next_page: .next_page
)
run_pipeline_step(:after_run) do
after_run()
end
end
info(message: 'Pipeline finished')
rescue MarkedAsFailedError
skip!('Skipping pipeline due to failed entity')
end
|