Class: Pipeline::Stage::Base

Inherits:
ActiveRecord::Base
  • Object
show all
Defined in:
lib/pipeline/stage/base.rb

Overview

A stage represents one of the steps in a pipeline. Stages can be reused by different pipelines. The behaviour of a stage is determined by subclasses of Pipeline::Stage::Base implementing the method #run:

class PrepareIngredients < Pipeline::Stage::Base
  def run
    Ingredients.each do |ingredient|
      ingredient.wash
      ingredient.slice!
    end
  end
end

If a stage need access to its associated pipeline, it can call the association pipeline.

Stage Name

By default, a stage will have a standard name that corresponds to its fully qualified class name (e.g. Pipeline::Stage::Base or SamplePipeline::SampleStage). You can provide a more descriptive name by calling default_name:

class PrepareIngredients < Pipeline::Stage::Base
  self.default_name = "Prepare Ingredients for Cooking"
end

You can retrieve the name of a stage using the name attribute:

stage = PrepareIngredients.new
stage.name # => "Prepare Ingredients for Cooking"

Error Handling

In case of failure, a stage can raise special exceptions (RecoverableError or IrrecoverableError) to determine what happens to the pipeline. Please refer to Pipeline::Base for a description of the possible outcomes. Any failure will persist the Exception’s message on the message attribute and will move the stage to a :failed state.

State Transitions

The following diagram represents the state transitions a stage instance can go through during its life-cycle:

:not_started —> :in_progress —> :completed

  ^ |
  | v
:failed
:not_started

The stage was instantiated but not started yet.

:in_progress

After started or retried, the stage remains on this state while executing.

:completed

After successfully executing, the stage is completed.

:failed

If an error occurs, the stage goes into this stage.

Callbacks

You can define custom callbacks to be called before (before_stage) and after (after_stage) executing a stage. Example:

class PrepareIngredients < Pipeline::Stage::Base
  before_stage :wash_ingredients

  def run
    puts "Slicing..."
  end

  protected
  def wash_ingredients
    puts "Washing..."
  end
end

class Cook < Pipeline::Stage::Base
  after_stage :serve

  def run
    puts "Cooking..."
  end

  protected
  def serve
    puts "bon appetit!"
  end
end

class MakeDinnerPipeline < Pipeline::Base
  define_stages PrepareIngredients >> Cook
end

Pipeline.start(MakeDinnerPipeline.new)

Outputs:

Washing...
Slicing...
Cooking...
bon appetit!

Callbacks can be defined as a symbol that calls a private/protected method (like the example above), as an inline block, or as a Callback object, as a regular ActiveRecord callback.

Constant Summary collapse

@@chain =
[]

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.>>(next_stage) ⇒ Object

Method used for chaining stages on a pipeline sequence. Please refer to Pipeline::Base for example usages.



125
126
127
128
# File 'lib/pipeline/stage/base.rb', line 125

def self.>>(next_stage)
  @@chain << self
  next_stage
end

.build_chainObject

Method used by Pipeline::Base to construct its chain of stages. Please refer to Pipeline::Base



132
133
134
135
136
# File 'lib/pipeline/stage/base.rb', line 132

def self.build_chain
  chain = @@chain + [self]
  @@chain = []
  chain
end

Instance Method Details

#after_initializeObject

Standard ActiveRecord callback to setup initial name and status when a new stage is instantiated. If you override this callback, make sure to call super:

class SampleStage < Pipeline::Stage::Base
  def after_initialize
    super
    self[:special_attribute] ||= "standard value"
  end
end


148
149
150
151
152
153
# File 'lib/pipeline/stage/base.rb', line 148

def after_initialize
  if new_record?
    self[:status] = :not_started
    self.name ||= (default_name || self.class).to_s
  end
end

#completed?Boolean

Returns true if the stage is in a :completed state, false otherwise.

Returns:

  • (Boolean)


157
158
159
# File 'lib/pipeline/stage/base.rb', line 157

def completed?
  status == :completed
end

#performObject

Standard method called when executing this stage. Raises

InvalidStatusError if stage is in an invalid state for execution (e.g. already completed, or in progress).

NOTE: Do not override this method to determine the behaviour of a stage. This method will be called by the executing pipeline. Please override #run instead.

Raises:



168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
# File 'lib/pipeline/stage/base.rb', line 168

def perform
  reload unless new_record?
  raise InvalidStatusError.new(status) unless [:not_started, :failed].include?(status)
  begin
    _setup
    run
    self.status = :completed
  rescue Exception => e
    logger.info("Error on stage #{default_name}: #{e.message}")
    logger.info(e.backtrace.join("\n"))
    self.message = e.message
    self.status = :failed
    raise e
  ensure
    run_callbacks(:after_stage)
  end
end

#runObject

Abstract method to be implemented by all subclasses that represents the action to be performed by this stage



188
189
190
# File 'lib/pipeline/stage/base.rb', line 188

def run
  raise "This method must be implemented by any subclass of Pipeline::Stage::Base"
end