Class: Mementus::Pipeline::Step

Inherits:
Object
  • Object
show all
Defined in:
lib/mementus/pipeline/step.rb

Overview

Represents a step in a pipeline chain.

New steps are constructed from a ‘source` enumerable (usually the previous step in the chain) and an optional `pipe` which provides the strategy for generating output values.

Each step has an internal ‘Fiber` context which is used to yield control to the next step in the chain on each value in the iteration, rather than cycle through the entire list of values before forwarding control.

This avoids the problem of iterating over a huge set of nodes and edges which are then discarded by a later step.

The approach here is roughly similar to the way that Ruby chains together ‘Enumerator::Lazy` objects.

Instance Method Summary collapse

Constructor Details

#initialize(source, pipe = nil, graph = nil) ⇒ Step

Initialize a pipeline step from the given source.

Parameters:



23
24
25
26
27
# File 'lib/mementus/pipeline/step.rb', line 23

def initialize(source, pipe=nil, graph=nil)
  @source = source
  @pipe = pipe || Pipe.new(graph)
  @graph = graph
end

Instance Method Details

#allObject

Returns all values in the sequence



117
118
119
# File 'lib/mementus/pipeline/step.rb', line 117

def all
  to_enum.to_a
end

#eachObject

Loop through each value in the sequence, yielding control to the next step if necessary.

If a block is provided, it is called with the value. Otherwise, a lazy enumerator representing the wrapped source is returned.



37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/mementus/pipeline/step.rb', line 37

def each
  return to_enum unless block_given?

  context = Fiber.new do
    source.each do |element|
      pipe.call(element)
    end

    raise StopIteration
  end

  loop do
    yield context.resume
  end
end

#firstObject

Returns the first value in the sequence.



112
113
114
# File 'lib/mementus/pipeline/step.rb', line 112

def first
  to_enum.first
end

#idObject

Dereference ids from the source elements.



54
55
56
57
58
# File 'lib/mementus/pipeline/step.rb', line 54

def id
  ids = to_enum.map { |element| element.id }
  return ids.first if ids.length == 1
  ids
end

#inObject

Traverse to the incoming nodes pointing to the source elements.



70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/mementus/pipeline/step.rb', line 70

def in
  incoming_nodes = []

  source.each do |node|
    graph.each_node do |graph_node|
      graph.each_adjacent(graph_node.id) do |adj_node|
        incoming_nodes << graph_node if adj_node.id == node.id
      end
    end
  end

  Step.new(incoming_nodes)
end

#in_eObject

Traverse to the incoming edges pointing to the source elements.



98
99
100
101
102
103
104
105
106
107
108
109
# File 'lib/mementus/pipeline/step.rb', line 98

def in_e
  ids = source.map(&:id)
  incoming_edges = []

  graph.each_node do |graph_node|
    graph.each_adjacent(graph_node.id) do |adj_node|
      incoming_edges << Mementus::Edge.new(from: graph_node, to: adj_node) if ids.include?(adj_node.id)
    end
  end

  Step.new(incoming_edges)
end

#outObject

Traverse to the outgoing nodes adjacent to the source elements.



61
62
63
64
65
66
67
# File 'lib/mementus/pipeline/step.rb', line 61

def out
  outgoing_nodes = source.inject([]) do |result, node|
    result.concat(node.adjacent)
  end

  Step.new(outgoing_nodes)
end

#out_eObject

Traverse to the outgoing edges from the source elements.



85
86
87
88
89
90
91
92
93
94
95
# File 'lib/mementus/pipeline/step.rb', line 85

def out_e
  outgoing_edges = []

  source.each do |node|
    outgoing_edges = graph.each_adjacent(node.id).map do |id|
      Mementus::Edge.new(from: node, to: id)
    end
  end

  Step.new(outgoing_edges)
end

#take(num) ⇒ Object

Returns the given number of values from the sequence.



122
123
124
# File 'lib/mementus/pipeline/step.rb', line 122

def take(num)
  to_enum.take(num)
end