Module: Plines::Pipeline

Extended by:
Forwardable
Defined in:
lib/plines/pipeline.rb

Overview

This module should be extended onto a class or module in order to make it a pipeline. Steps declared within that module will automatically belong to that pipeline. This enables one application to have multiple pipelines.

Defined Under Namespace

Classes: BoundaryStepAlreadySetError, NullStep

Constant Summary collapse

DEFAULT_QUEUE =
"plines"
AWAITING_EXTERNAL_DEPENDENCY_QUEUE =
"awaiting_ext_dep"

Instance Method Summary collapse

Instance Method Details

#configurationObject



17
18
19
# File 'lib/plines/pipeline.rb', line 17

def configuration
  @configuration ||= Configuration.new
end

#configure {|configuration| ... } ⇒ Object

Yields:



21
22
23
# File 'lib/plines/pipeline.rb', line 21

def configure
  yield configuration
end

#enqueue_jobs_for(batch_data, options = {}) ⇒ Object



25
26
27
28
29
30
31
32
33
34
# File 'lib/plines/pipeline.rb', line 25

def enqueue_jobs_for(batch_data, options = {})
  batch_data = IndifferentHash.from(batch_data)
  graph = DependencyGraph.new(self, batch_data)
  job_batch_list = job_batch_list_for(batch_data)

  job_batch_list.create_new_batch(batch_data, options) do |job_batch|
    job_options_block = configuration.qless_job_options_block
    JobEnqueuer.new(graph, job_batch, &job_options_block).enqueue_jobs
  end
end

#find_job_batch(id) ⇒ Object



40
41
42
43
44
# File 'lib/plines/pipeline.rb', line 40

def find_job_batch(id)
  key = id[/\A(.*):\d+\z/, 1] # http://rubular.com/r/fMGv1TaZZA
  qless = configuration.qless_client_for(key)
  Plines::JobBatch.find(qless, self, id)
end

#job_batch_list_for(batch_data) ⇒ Object



84
85
86
87
# File 'lib/plines/pipeline.rb', line 84

def job_batch_list_for(batch_data)
  key = configuration.batch_list_key_for(batch_data)
  JobBatchList.new(self, key)
end

#matching_older_unfinished_job_batches(main_job_batch) ⇒ Object



89
90
91
92
93
94
# File 'lib/plines/pipeline.rb', line 89

def matching_older_unfinished_job_batches(main_job_batch)
  job_batch_list = job_batch_list_for(main_job_batch.data)
  job_batch_list.each.select do |job_batch|
    !job_batch.complete? && job_batch.created_at < main_job_batch.created_at
  end
end

#most_recent_job_batch_for(batch_data) ⇒ Object



36
37
38
# File 'lib/plines/pipeline.rb', line 36

def most_recent_job_batch_for(batch_data)
  job_batch_list_for(batch_data).most_recent_batch
end

#step_classesObject



46
47
48
# File 'lib/plines/pipeline.rb', line 46

def step_classes
  @step_classes ||= []
end