Class: Chicago::ETL::RowTransformationStage
- Defined in:
- lib/chicago/etl/row_transformation_stage.rb
Overview
A Stage that passes source rows through a transformation chain.
All rows are read into Ruby and then written to sinks after passing through 0 or more Transformations.
Instance Attribute Summary collapse
-
#source ⇒ Object
readonly
Returns the source for this stage.
Attributes inherited from Stage
Instance Method Summary collapse
- #filtered_source(etl_batch) ⇒ Object private
-
#initialize(options = {}) ⇒ RowTransformationStage
constructor
A new instance of RowTransformationStage.
-
#perform_execution(etl_batch) ⇒ Object
Executes this stage in the context of an ETL::Batch.
-
#sink(name) ⇒ Object
Returns the named sink, if it exists.
- #sinks ⇒ Object
Methods inherited from Stage
#executable?, #execute, #task_name
Constructor Details
#initialize(options = {}) ⇒ RowTransformationStage
Returns a new instance of RowTransformationStage.
11 12 13 14 15 16 17 18 19 |
# File 'lib/chicago/etl/row_transformation_stage.rb', line 11 def initialize(={}) super @source = [:source] @sinks = [:sinks] @transformations = [:transformations] || [] @filter_strategy = [:filter_strategy] || lambda {|s, _| s } validate_arguments end |
Instance Attribute Details
#source ⇒ Object (readonly)
Returns the source for this stage.
9 10 11 |
# File 'lib/chicago/etl/row_transformation_stage.rb', line 9 def source @source end |
Instance Method Details
#filtered_source(etl_batch) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
36 37 38 39 40 41 |
# File 'lib/chicago/etl/row_transformation_stage.rb', line 36 def filtered_source(etl_batch) filtered_dataset = etl_batch.reextracting? ? source : @filter_strategy.call(source, etl_batch) DatasetSource.new(filtered_dataset) end |
#perform_execution(etl_batch) ⇒ Object
Executes this stage in the context of an ETL::Batch
22 23 24 |
# File 'lib/chicago/etl/row_transformation_stage.rb', line 22 def perform_execution(etl_batch) transform_and_load filtered_source(etl_batch) end |
#sink(name) ⇒ Object
Returns the named sink, if it exists
27 28 29 |
# File 'lib/chicago/etl/row_transformation_stage.rb', line 27 def sink(name) @sinks[name.to_sym] end |
#sinks ⇒ Object
31 32 33 |
# File 'lib/chicago/etl/row_transformation_stage.rb', line 31 def sinks @sinks.values end |