Class: ArcFurnace::Pipeline
- Inherits:
-
Object
- Object
- ArcFurnace::Pipeline
- Defined in:
- lib/arc-furnace/pipeline.rb
Defined Under Namespace
Classes: PipelineInstance
Class Method Summary collapse
-
.filter(node_id, type: BlockFilter, params: {}, &block) ⇒ Object
Define a node that filters rows.
-
.hash_node(node_id, type: ArcFurnace::Hash, params:) ⇒ Object
Define a hash node, processing all rows from it’s source and caching them in-memory.
-
.inherited(subclass) ⇒ Object
Ensure that subclasses don’t overwrite the parent’s transform node definitions.
-
.inner_join(node_id, type: ArcFurnace::InnerJoin, params:) ⇒ Object
Define an inner join node where rows from the source are dropped if an associated entity is not found in the hash for the join key.
-
.instance(params = {}) ⇒ Object
Create an instance to run a transformation, passing the parameters to instantiate the transform instance with.
-
.outer_join(node_id, type: ArcFurnace::OuterJoin, params:) ⇒ Object
Define an outer join nod e where rows from the source are kept even if an associated entity is not found in the hash for the join key.
-
.sink(type:, source:, params:) ⇒ Object
Define the sink for this transformation.
-
.source(node_id, type:, params:) ⇒ Object
A source that has row semantics, delivering a hash per row (or per entity) for the source.
-
.transform(node_id, type: BlockTransform, params: {}, &block) ⇒ Object
Define a node that transforms rows.
-
.unfold(node_id, type: BlockUnfold, params: {}, &block) ⇒ Object
Define a node that unfolds rows.
Class Method Details
.filter(node_id, type: BlockFilter, params: {}, &block) ⇒ Object
Define a node that filters rows. By default you get a BlockFilter (and when this metaprogramming method is passed a block) that will be passed a hash for each row. The result of the block determines if a given row flows to a downstream node
84 85 86 87 88 89 90 |
# File 'lib/arc-furnace/pipeline.rb', line 84 def self.filter(node_id, type: BlockFilter, params: {}, &block) if block_given? && type <= BlockFilter params[:block] = block end raise "Filter #{type} is not a Filter!" unless type <= Filter define_intermediate(node_id, type: type, params: params) end |
.hash_node(node_id, type: ArcFurnace::Hash, params:) ⇒ Object
Define a hash node, processing all rows from it’s source and caching them in-memory.
33 34 35 |
# File 'lib/arc-furnace/pipeline.rb', line 33 def self.hash_node(node_id, type: ArcFurnace::Hash, params:) define_intermediate(node_id, type: type, params: params) end |
.inherited(subclass) ⇒ Object
Ensure that subclasses don’t overwrite the parent’s transform node definitions
13 14 15 |
# File 'lib/arc-furnace/pipeline.rb', line 13 def self.inherited(subclass) subclass.intermediates_map = intermediates_map.dup end |
.inner_join(node_id, type: ArcFurnace::InnerJoin, params:) ⇒ Object
Define an inner join node where rows from the source are dropped if an associated entity is not found in the hash for the join key
46 47 48 |
# File 'lib/arc-furnace/pipeline.rb', line 46 def self.inner_join(node_id, type: ArcFurnace::InnerJoin, params:) define_intermediate(node_id, type: type, params: params) end |
.instance(params = {}) ⇒ Object
Create an instance to run a transformation, passing the parameters to instantiate the transform instance with. The resulting class instance will have a single public method–#execute, which will perform the transformation.
96 97 98 |
# File 'lib/arc-furnace/pipeline.rb', line 96 def self.instance(params = {}) PipelineInstance.new(self, params) end |
.outer_join(node_id, type: ArcFurnace::OuterJoin, params:) ⇒ Object
Define an outer join nod e where rows from the source are kept even if an associated entity is not found in the hash for the join key
52 53 54 |
# File 'lib/arc-furnace/pipeline.rb', line 52 def self.outer_join(node_id, type: ArcFurnace::OuterJoin, params:) define_intermediate(node_id, type: type, params: params) end |
.sink(type:, source:, params:) ⇒ Object
Define the sink for this transformation. Only a single sink may be specified per transformation. The sink is delivered a hash per row or entity, and feeds them from the graph of nodes above it.
20 21 22 23 24 25 26 27 28 29 |
# File 'lib/arc-furnace/pipeline.rb', line 20 def self.sink(type: , source:, params:) if sink_node raise 'Sink already defined!' end @sink_node = -> do type.new(resolve_parameters(:sink, params)) end @sink_source = source end |
.source(node_id, type:, params:) ⇒ Object
A source that has row semantics, delivering a hash per row (or per entity) for the source.
39 40 41 42 |
# File 'lib/arc-furnace/pipeline.rb', line 39 def self.source(node_id, type:, params:) raise "Source #{type} is not a Source!" unless type <= Source define_intermediate(node_id, type: type, params: params) end |
.transform(node_id, type: BlockTransform, params: {}, &block) ⇒ Object
Define a node that transforms rows. By default you get a BlockTransform (and when this metaprogramming method is passed a block) that will be passed a hash for each row. The result of the block becomes the row for the next downstream node.
60 61 62 63 64 65 66 |
# File 'lib/arc-furnace/pipeline.rb', line 60 def self.transform(node_id, type: BlockTransform, params: {}, &block) if block_given? && type <= BlockTransform params[:block] = block end raise "Transform #{type} is not a Transform!" unless type <= Transform define_intermediate(node_id, type: type, params: params) end |
.unfold(node_id, type: BlockUnfold, params: {}, &block) ⇒ Object
Define a node that unfolds rows. By default you get a BlockUnfold (and when this metaprogramming method is passed a block) that will be passed a hash for each row. The result of the block becomes the set of rows for the next downstream node.
72 73 74 75 76 77 78 |
# File 'lib/arc-furnace/pipeline.rb', line 72 def self.unfold(node_id, type: BlockUnfold, params: {}, &block) if block_given? && type <= BlockUnfold params[:block] = block end raise "Unfold #{type} is not an Unfold!" unless type <= Unfold define_intermediate(node_id, type: type, params: params) end |