Class: Mementus::Pipeline::Step
- Inherits:
-
Object
- Object
- Mementus::Pipeline::Step
- Defined in:
- lib/mementus/pipeline/step.rb
Overview
Represents a step in a pipeline chain.
New steps are constructed from a ‘source` enumerable (usually the previous step in the chain) and an optional `pipe` which provides the strategy for generating output values.
Each step has an internal ‘Fiber` context which is used to yield control to the next step in the chain on each value in the iteration, rather than cycle through the entire list of values before forwarding control.
This avoids the problem of iterating over a huge set of nodes and edges which are then discarded by a later step.
The approach here is roughly similar to the way that Ruby chains together ‘Enumerator::Lazy` objects.
Instance Method Summary collapse
-
#all ⇒ Object
Returns all values in the sequence.
-
#each ⇒ Object
Loop through each value in the sequence, yielding control to the next step if necessary.
-
#first ⇒ Object
Returns the first value in the sequence.
-
#id ⇒ Object
Dereference ids from the source elements.
-
#in ⇒ Object
Traverse to the incoming nodes pointing to the source elements.
-
#in_e ⇒ Object
Traverse to the incoming edges pointing to the source elements.
-
#initialize(source, pipe = nil, graph = nil) ⇒ Step
constructor
Initialize a pipeline step from the given source.
-
#out ⇒ Object
Traverse to the outgoing nodes adjacent to the source elements.
-
#out_e ⇒ Object
Traverse to the outgoing edges from the source elements.
-
#take(num) ⇒ Object
Returns the given number of values from the sequence.
Constructor Details
Instance Method Details
#all ⇒ Object
Returns all values in the sequence
117 118 119 |
# File 'lib/mementus/pipeline/step.rb', line 117 def all to_enum.to_a end |
#each ⇒ Object
Loop through each value in the sequence, yielding control to the next step if necessary.
If a block is provided, it is called with the value. Otherwise, a lazy enumerator representing the wrapped source is returned.
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
# File 'lib/mementus/pipeline/step.rb', line 37 def each return to_enum unless block_given? context = Fiber.new do source.each do |element| pipe.call(element) end raise StopIteration end loop do yield context.resume end end |
#first ⇒ Object
Returns the first value in the sequence.
112 113 114 |
# File 'lib/mementus/pipeline/step.rb', line 112 def first to_enum.first end |
#id ⇒ Object
Dereference ids from the source elements.
54 55 56 57 58 |
# File 'lib/mementus/pipeline/step.rb', line 54 def id ids = to_enum.map { |element| element.id } return ids.first if ids.length == 1 ids end |
#in ⇒ Object
Traverse to the incoming nodes pointing to the source elements.
70 71 72 73 74 75 76 77 78 79 80 81 82 |
# File 'lib/mementus/pipeline/step.rb', line 70 def in incoming_nodes = [] source.each do |node| graph.each_node do |graph_node| graph.each_adjacent(graph_node.id) do |adj_node| incoming_nodes << graph_node if adj_node.id == node.id end end end Step.new(incoming_nodes) end |
#in_e ⇒ Object
Traverse to the incoming edges pointing to the source elements.
98 99 100 101 102 103 104 105 106 107 108 109 |
# File 'lib/mementus/pipeline/step.rb', line 98 def in_e ids = source.map(&:id) incoming_edges = [] graph.each_node do |graph_node| graph.each_adjacent(graph_node.id) do |adj_node| incoming_edges << Mementus::Edge.new(from: graph_node, to: adj_node) if ids.include?(adj_node.id) end end Step.new(incoming_edges) end |
#out ⇒ Object
Traverse to the outgoing nodes adjacent to the source elements.
61 62 63 64 65 66 67 |
# File 'lib/mementus/pipeline/step.rb', line 61 def out outgoing_nodes = source.inject([]) do |result, node| result.concat(node.adjacent) end Step.new(outgoing_nodes) end |
#out_e ⇒ Object
Traverse to the outgoing edges from the source elements.
85 86 87 88 89 90 91 92 93 94 95 |
# File 'lib/mementus/pipeline/step.rb', line 85 def out_e outgoing_edges = [] source.each do |node| outgoing_edges = graph.each_adjacent(node.id).map do |id| Mementus::Edge.new(from: node, to: id) end end Step.new(outgoing_edges) end |
#take(num) ⇒ Object
Returns the given number of values from the sequence.
122 123 124 |
# File 'lib/mementus/pipeline/step.rb', line 122 def take(num) to_enum.take(num) end |