Class: Pipeline::Base

Inherits:
ActiveRecord::Base
  • Object
show all
Defined in:
lib/pipeline/base.rb

Overview

Pipeline Stages

Each pipeline is composed of sequential stages (see Pipeline::Stage::Base). The stages that will be executed are defined as follows:

class PrepareIngredients < Pipeline::Stage::Base
  def run
    puts "Slicing..."
  end
end

class Cook < Pipeline::Stage::Base
  def run
    puts "Cooking..."
  end
end

class MakeDinnerPipeline < Pipeline::Base
  define_stages PrepareIngredients >> Cook
end

When this pipeline executes, it will run each stage sequentially, and the output would be:

Slicing...
Cooking...

A pipeline can get access to its stages through the stages association.

Error Handling

There are 3 types of errors that a failed stage can specifically raise:

  • Recoverable (requires user action): If a stage raises RecoverableError with input_required? == true, the pipeline gets :paused and can be resumed or cancelled by calling #resume and #cancel, respectively.

  • Recoverable (can be automatically retried): If a stage raises RecoverableError with input_required? == false, the pipeline goes into :retry state and will be automatically retried. This is currently achieved by delayed_job‘s retry mechanism. Please refer to github.com/collectiveidea/delayed_job for information about how to configure the maximum number of retry attempts.

  • Irrecoverable: If a stage fails with an IrrecoverableError, the pipeline gets :failed and therefore cannot be resumed or restarted.

If a stage fails with any other type of error, you can choose the default behaviour for what happens to the pipeline. By default, the pipeline will pause, so it can be later resumed. This can be overriden by calling default_failure_mode like:

class SamplePipeline < Pipeline::Base
  self.default_failure_mode = :cancel
end

You can always go back to the default mode by calling:

self.default_failure_mode = :pause

State Transitions

The following diagram represents the state transitions a pipeline instance can go through during its life-cycle:

:not_started —> :in_progress —> :completed / :failed

      ^ |
      | v
:paused / :retry
:not_started

The pipeline was instantiated but not started yet.

:in_progress

After started or resumed, the pipeline remains on this state while the stages are running.

:paused

If a stage fails with a recoverable error that requires user action, the pipeline gets paused.

:retry

If a stage fails with a recoverable error that can be automatically retried, the pipeline goes into this stage.

:completed

After successfully running all stages, the pipeline is completed.

:failed

If a stage fails with an unrecoverable error, or if the pipeline is cancelled, it goes into this stage.

Referencing External Objects

The execution of a pipeline will usually be associated to an external entity (e.g. a User if the stages represent an internal user registration process, or a Recipe in the examples of this page). To be able to reference the associated object from the stages, Pipeline::Base has an attribute external_id that can be used on a custom association to any external entity. Example:

class MakeDinnerPipeline < Pipeline::Base
  define_stages PrepareIngredients >> Cook
  belongs_to :recipe, :foreign_key => 'external_id'
end

A Stage can reference this object as such:

class Cook < Pipeline::Stage::Base
  def run
    puts "Cooking a delicious #{pipeline.recipe.name}"
  end
end

Callbacks

You can define custom callbacks to be called before (before_pipeline) and after (after_pipeline) executing a pipeline. Example:

class PrepareIngredients < Pipeline::Stage::Base
  def run
    puts "Slicing..."
  end
end

class Cook < Pipeline::Stage::Base
  def run
    puts "Cooking..."
  end
end

class MakeDinnerPipeline < Pipeline::Base
  define_stages PrepareIngredients >> Cook

  before_pipeline :wash_hands
  after_pipeline :serve_dinner

  private
  def wash_hands
    puts "Washing hands before we start..."
  end

  def serve_dinner
    puts "bon appetit!"
  end
end

Pipeline.start(MakeDinnerPipeline.new)

Outputs:

Washing hands before we start...
Slicing...
Cooking...
bon appetit!

Callbacks can be defined as a symbol that calls a private/protected method (like the example above), as an inline block, or as a Callback object, as a regular ActiveRecord callback.

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.default_failure_mode=(mode) ⇒ Object

Sets the behaviour of this pipeline when a failure occurs. Accepted symbols are:

:pause

Pauses the pipeline on failure (default)

:cancel

Fails the pipeline on failure



180
181
182
183
# File 'lib/pipeline/base.rb', line 180

def self.default_failure_mode=(mode)
  new_mode = [:pause, :cancel].include?(mode) ? mode : :pause
  self.failure_mode = new_mode
end

.define_stages(stages) ⇒ Object

Defines the stages of this pipeline. Please refer to section “Pipeline Stages” above



172
173
174
# File 'lib/pipeline/base.rb', line 172

def self.define_stages(stages)
  self.defined_stages = stages.build_chain
end

Instance Method Details

#after_initializeObject

Standard ActiveRecord callback to setup initial stages and status when a new pipeline is instantiated. If you override this callback, make sure to call super:

class SamplePipeline < Pipeline::Base
  def after_initialize
    super
    self[:special_attribute] ||= "standard value"
  end
end


195
196
197
198
199
200
201
202
# File 'lib/pipeline/base.rb', line 195

def after_initialize
  if new_record?
    self[:status] = :not_started
    self.class.defined_stages.each do |stage_class|
      stages << stage_class.new(:pipeline => self)
    end
  end
end

#cancelObject

Attempts to cancel this pipeline. Raises InvalidStatusError if pipeline is in an invalid state for cancelling (e.g. already cancelled, or completed)



236
237
238
239
# File 'lib/pipeline/base.rb', line 236

def cancel
  _check_valid_status
  _complete_with_status(:failed)
end

#performObject

Standard delayed_job method called when executing this pipeline. Raises

InvalidStatusError if pipeline is in an invalid state for execution (e.g. already cancelled, or completed).

This method will be called by delayed_job if this object is enqueued for asynchronous execution. However, you could call this method and execute the pipeline synchronously, without relying on delayed_job. Auto-retry would not work in this case, though.



212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
# File 'lib/pipeline/base.rb', line 212

def perform
  _check_valid_status
  begin
    _setup
    stages.each do |stage|
      stage.perform unless stage.completed?
    end
    _complete_with_status(:completed)
  rescue IrrecoverableError
    _complete_with_status(:failed)
  rescue RecoverableError => e
    if e.input_required?
      _complete_with_status(:paused)
    else
      _complete_with_status(:retry)
      raise e
    end
  rescue Exception
    _complete_with_status(failure_mode == :cancel ? :failed : :paused)
  end
end

#resumeObject

Attempts to resume this pipeline. Raises InvalidStatusError if pipeline is in an invalid state for resuming (e.g. already cancelled, or completed)



243
244
245
# File 'lib/pipeline/base.rb', line 243

def resume
  _check_valid_status
end