Module: Plines::Pipeline
- Extended by:
- Forwardable
- Defined in:
- lib/plines/pipeline.rb
Overview
This module should be extended onto a class or module in order to make it a pipeline. Steps declared within that module will automatically belong to that pipeline. This enables one application to have multiple pipelines.
Defined Under Namespace
Classes: BoundaryStepAlreadySetError, NullStep
Constant Summary
collapse
- DEFAULT_QUEUE =
"plines"
- AWAITING_EXTERNAL_DEPENDENCY_QUEUE =
"awaiting_ext_dep"
Instance Method Summary
collapse
Instance Method Details
#configuration ⇒ Object
17
18
19
|
# File 'lib/plines/pipeline.rb', line 17
def configuration
@configuration ||= Configuration.new
end
|
21
22
23
|
# File 'lib/plines/pipeline.rb', line 21
def configure
yield configuration
end
|
#enqueue_jobs_for(batch_data, options = {}) ⇒ Object
25
26
27
28
29
30
31
32
33
34
|
# File 'lib/plines/pipeline.rb', line 25
def enqueue_jobs_for(batch_data, options = {})
batch_data = IndifferentHash.from(batch_data)
graph = DependencyGraph.new(self, batch_data)
job_batch_list = job_batch_list_for(batch_data)
job_batch_list.create_new_batch(batch_data, options) do |job_batch|
job_options_block = configuration.qless_job_options_block
JobEnqueuer.new(graph, job_batch, &job_options_block).enqueue_jobs
end
end
|
#find_job_batch(id) ⇒ Object
40
41
42
43
44
|
# File 'lib/plines/pipeline.rb', line 40
def find_job_batch(id)
key = id[/\A(.*):\d+\z/, 1] qless = configuration.qless_client_for(key)
Plines::JobBatch.find(qless, self, id)
end
|
#job_batch_list_for(batch_data) ⇒ Object
84
85
86
87
|
# File 'lib/plines/pipeline.rb', line 84
def job_batch_list_for(batch_data)
key = configuration.batch_list_key_for(batch_data)
JobBatchList.new(self, key)
end
|
#matching_older_unfinished_job_batches(main_job_batch) ⇒ Object
89
90
91
92
93
94
|
# File 'lib/plines/pipeline.rb', line 89
def matching_older_unfinished_job_batches(main_job_batch)
job_batch_list = job_batch_list_for(main_job_batch.data)
job_batch_list.each.select do |job_batch|
!job_batch.complete? && job_batch.created_at < main_job_batch.created_at
end
end
|
#most_recent_job_batch_for(batch_data) ⇒ Object
36
37
38
|
# File 'lib/plines/pipeline.rb', line 36
def most_recent_job_batch_for(batch_data)
job_batch_list_for(batch_data).most_recent_batch
end
|
#step_classes ⇒ Object
46
47
48
|
# File 'lib/plines/pipeline.rb', line 46
def step_classes
@step_classes ||= []
end
|