Class: Chicago::ETL::RowTransformationStage

Inherits:
Stage
  • Object
show all
Defined in:
lib/chicago/etl/row_transformation_stage.rb

Overview

A Stage that passes source rows through a transformation chain.

All rows are read into Ruby and then written to sinks after passing through 0 or more Transformations.

Instance Attribute Summary collapse

Attributes inherited from Stage

#name

Instance Method Summary collapse

Methods inherited from Stage

#executable?, #execute, #task_name

Constructor Details

#initialize(options = {}) ⇒ RowTransformationStage

Returns a new instance of RowTransformationStage.



11
12
13
14
15
16
17
18
19
# File 'lib/chicago/etl/row_transformation_stage.rb', line 11

def initialize(options={})
  super
  @source = options[:source]
  @sinks = options[:sinks]
  @transformations = options[:transformations] || []
  @filter_strategy = options[:filter_strategy] || lambda {|s, _| s }

  validate_arguments
end

Instance Attribute Details

#sourceObject (readonly)

Returns the source for this stage.



9
10
11
# File 'lib/chicago/etl/row_transformation_stage.rb', line 9

def source
  @source
end

Instance Method Details

#filtered_source(etl_batch) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



36
37
38
39
40
41
# File 'lib/chicago/etl/row_transformation_stage.rb', line 36

def filtered_source(etl_batch)
  filtered_dataset = etl_batch.reextracting? ? source : 
    @filter_strategy.call(source, etl_batch)

  DatasetSource.new(filtered_dataset)
end

#perform_execution(etl_batch) ⇒ Object

Executes this stage in the context of an ETL::Batch



22
23
24
# File 'lib/chicago/etl/row_transformation_stage.rb', line 22

def perform_execution(etl_batch)
  transform_and_load filtered_source(etl_batch)
end

#sink(name) ⇒ Object

Returns the named sink, if it exists



27
28
29
# File 'lib/chicago/etl/row_transformation_stage.rb', line 27

def sink(name)
  @sinks[name.to_sym]
end

#sinksObject



31
32
33
# File 'lib/chicago/etl/row_transformation_stage.rb', line 31

def sinks
  @sinks.values
end