Class: ArcFurnace::Pipeline

Inherits:
Object
  • Object
show all
Defined in:
lib/arc-furnace/pipeline.rb

Defined Under Namespace

Classes: PipelineInstance

Class Method Summary collapse

Class Method Details

.filter(node_id, type: BlockFilter, params: {}, &block) ⇒ Object

Define a node that filters rows. By default you get a BlockFilter (and when this metaprogramming method is passed a block) that will be passed a hash for each row. The result of the block determines if a given row flows to a downstream node



84
85
86
87
88
89
90
# File 'lib/arc-furnace/pipeline.rb', line 84

def self.filter(node_id, type: BlockFilter, params: {}, &block)
  if block_given? && type <= BlockFilter
    params[:block] = block
  end
  raise "Filter #{type} is not a Filter!" unless type <= Filter
  define_intermediate(node_id, type: type, params: params)
end

.hash_node(node_id, type: ArcFurnace::Hash, params:) ⇒ Object

Define a hash node, processing all rows from it’s source and caching them in-memory.



33
34
35
# File 'lib/arc-furnace/pipeline.rb', line 33

def self.hash_node(node_id, type: ArcFurnace::Hash, params:)
  define_intermediate(node_id, type: type, params: params)
end

.inherited(subclass) ⇒ Object

Ensure that subclasses don’t overwrite the parent’s transform node definitions



13
14
15
# File 'lib/arc-furnace/pipeline.rb', line 13

def self.inherited(subclass)
  subclass.intermediates_map = intermediates_map.dup
end

.inner_join(node_id, type: ArcFurnace::InnerJoin, params:) ⇒ Object

Define an inner join node where rows from the source are dropped if an associated entity is not found in the hash for the join key



46
47
48
# File 'lib/arc-furnace/pipeline.rb', line 46

def self.inner_join(node_id, type: ArcFurnace::InnerJoin, params:)
  define_intermediate(node_id, type: type, params: params)
end

.instance(params = {}) ⇒ Object

Create an instance to run a transformation, passing the parameters to instantiate the transform instance with. The resulting class instance will have a single public method–#execute, which will perform the transformation.



96
97
98
# File 'lib/arc-furnace/pipeline.rb', line 96

def self.instance(params = {})
  PipelineInstance.new(self, params)
end

.outer_join(node_id, type: ArcFurnace::OuterJoin, params:) ⇒ Object

Define an outer join nod e where rows from the source are kept even if an associated entity is not found in the hash for the join key



52
53
54
# File 'lib/arc-furnace/pipeline.rb', line 52

def self.outer_join(node_id, type: ArcFurnace::OuterJoin, params:)
  define_intermediate(node_id, type: type, params: params)
end

.sink(type:, source:, params:) ⇒ Object

Define the sink for this transformation. Only a single sink may be specified per transformation. The sink is delivered a hash per row or entity, and feeds them from the graph of nodes above it.



20
21
22
23
24
25
26
27
28
29
# File 'lib/arc-furnace/pipeline.rb', line 20

def self.sink(type: , source:, params:)
  if sink_node
    raise 'Sink already defined!'
  end

  @sink_node = -> do
    type.new(resolve_parameters(:sink, params))
  end
  @sink_source = source
end

.source(node_id, type:, params:) ⇒ Object

A source that has row semantics, delivering a hash per row (or per entity) for the source.



39
40
41
42
# File 'lib/arc-furnace/pipeline.rb', line 39

def self.source(node_id, type:, params:)
  raise "Source #{type} is not a Source!" unless type <= Source
  define_intermediate(node_id, type: type, params: params)
end

.transform(node_id, type: BlockTransform, params: {}, &block) ⇒ Object

Define a node that transforms rows. By default you get a BlockTransform (and when this metaprogramming method is passed a block) that will be passed a hash for each row. The result of the block becomes the row for the next downstream node.



60
61
62
63
64
65
66
# File 'lib/arc-furnace/pipeline.rb', line 60

def self.transform(node_id, type: BlockTransform, params: {}, &block)
  if block_given? && type <= BlockTransform
    params[:block] = block
  end
  raise "Transform #{type} is not a Transform!" unless type <= Transform
  define_intermediate(node_id, type: type, params: params)
end

.unfold(node_id, type: BlockUnfold, params: {}, &block) ⇒ Object

Define a node that unfolds rows. By default you get a BlockUnfold (and when this metaprogramming method is passed a block) that will be passed a hash for each row. The result of the block becomes the set of rows for the next downstream node.



72
73
74
75
76
77
78
# File 'lib/arc-furnace/pipeline.rb', line 72

def self.unfold(node_id, type: BlockUnfold, params: {}, &block)
  if block_given? && type <= BlockUnfold
    params[:block] = block
  end
  raise "Unfold #{type} is not an Unfold!" unless type <= Unfold
  define_intermediate(node_id, type: type, params: params)
end