Class: Turbine::Pipeline::DSL

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Includes:
Enumerable
Defined in:
lib/turbine/pipeline/dsl.rb

Overview

Provides the chaining DSL used throughout Turbine, such as when calling Node#in, Node#descendants, etc.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(source) ⇒ DSL

Public: Creates a new DSL instance.

source - A Segment which acts as the head of the pipeline. Normally

an instance of Pump.

Returns a DSL.



31
32
33
# File 'lib/turbine/pipeline/dsl.rb', line 31

def initialize(source)
  @source = source
end

Instance Attribute Details

#sourceObject (readonly)

The final segment in the pipeline.



23
24
25
# File 'lib/turbine/pipeline/dsl.rb', line 23

def source
  @source
end

Instance Method Details

#also(*branches, &block) ⇒ Object

Public: Like split, but also yields the input value before running each branch.

branches - One or more blocks which will be given the DSL.

For example

# Yields each node, and their outwards nodes connected with a
# :spouse edge.
nodes.also(->(node) { node.out(:spouse) })

If you only want to supply a single branch, you can pass a block instead.

nodes.also { |node| node.out(:child) }

Returns a new DSL.



176
177
178
# File 'lib/turbine/pipeline/dsl.rb', line 176

def also(*branches, &block)
  append(Also.new(*branches, &block))
end

#ancestors(label = nil) ⇒ Object

Public: Using the breadth-first traversal strategy, fetches all of a node’s adjacent “in” nodes, and their adjacent “in” nodes, and so on.

label - An optional label which is used to restrict the edges

traversed to those with the label.

Returns a new DSL.



96
97
98
# File 'lib/turbine/pipeline/dsl.rb', line 96

def ancestors(label = nil)
  append(Traverse.new(:in, label))
end

#append(downstream) ⇒ Object

Public: Creates a new DSL by appending the given downstream segment to the current source.

downstream - The segment to be added in the new DSL.

Returns a new DSL.



264
265
266
# File 'lib/turbine/pipeline/dsl.rb', line 264

def append(downstream)
  DSL.new(@source.append(downstream))
end

#as(name) ⇒ Object

Public: Captures all of the values emitted by the previous segment so that a later segment (e.g. “only” or “except”) can use them.

name - A name assigned to the captured values.

Returns a new DSL.



186
187
188
# File 'lib/turbine/pipeline/dsl.rb', line 186

def as(name)
  append(Journal.new(name))
end

#descendants(label = nil) ⇒ Object

Public: Using the breadth-first traversal strategy, fetches all of a node’s adjacent “out” nodes, and their adjacent “out” nodes, and so on.

label - An optional label which is used to restrict the edges

traversed to those with the label.

Returns a new DSL.



108
109
110
# File 'lib/turbine/pipeline/dsl.rb', line 108

def descendants(label = nil)
  append(Traverse.new(:out, label))
end

#except(journal_name) ⇒ Object

Public: Creates a filter so that only values which were not present in a named journal (created using “as”) are emitted.

name - The name of the “as” journal.

# Who are your uncles and aunts?
node.in(:child).as(:parents).in(:child).out(:child).except(:parents)

Returns a new DSL.



214
215
216
# File 'lib/turbine/pipeline/dsl.rb', line 214

def except(journal_name)
  append(JournalFilter.new(:except, journal_name))
end

#get(key) ⇒ Object

Public: Queries each input for its key property. Expects the input to include Turbine::Properties.

key - The property key to be queried.

For example

pipe.get(:age) # => [11, 15, 18, 44, 46]

Returns a new DSL.



45
46
47
# File 'lib/turbine/pipeline/dsl.rb', line 45

def get(key)
  append(Sender.new(:get, key))
end

#in(label = nil) ⇒ Object

Public: Retrieves the inbound nodes on the input node or edge.

label - An optional label; only edges connected to the node via an

edge with this label will be emitted by the pipeline.

Returns a new DSL.



75
76
77
# File 'lib/turbine/pipeline/dsl.rb', line 75

def in(label = nil)
  append(Sender.new(:nodes, :in, label))
end

#in_edges(label = nil) ⇒ Object

Public: Retrieves the in_edges from the input nodes.

label - An optional label; only edges with a matching label will be

emitted by the pipeline.

Returns a DSL.



55
56
57
# File 'lib/turbine/pipeline/dsl.rb', line 55

def in_edges(label = nil)
  append(Sender.new(:edges, :in, label))
end

#inspectObject

Public: A human-readable version of the DSL. Shows the final result.

Return a String.



271
272
273
# File 'lib/turbine/pipeline/dsl.rb', line 271

def inspect
  to_a.inspect
end

#map(&block) ⇒ Object

Public: Given a block, transforms each input value to the result of running the block with the input.

block - A block used to transform each input value.

Returns a new DSL.



138
139
140
# File 'lib/turbine/pipeline/dsl.rb', line 138

def map(&block)
  append(Transform.new(&block))
end

#only(journal_name) ⇒ Object

Public: Creates a filter so that only values which were present in a named journal (created using “as”) are emitted.

journal_name - The name of the “as” journal.

For example

# Did your grandparents "friend" your parents?
node.in(:child).as(:parents).in(:child).out(:friend).only(:parents)

Returns a new DSL.



201
202
203
# File 'lib/turbine/pipeline/dsl.rb', line 201

def only(journal_name)
  append(JournalFilter.new(:only, journal_name))
end

#out(label = nil) ⇒ Object

Public: Retrieves the outbound nodes on the input node or edge.

label - An optional label; only edges connected to the node via an

edge with this label will be emitted by the pipeline.

Returns a new DSL.



85
86
87
# File 'lib/turbine/pipeline/dsl.rb', line 85

def out(label = nil)
  append(Sender.new(:nodes, :out, label))
end

#out_edges(label = nil) ⇒ Object

Public: Retrieves the out_edges from the input nodes.

label - An optional label; only edges with a matching label will be

emitted by the pipeline.

Returns a new DSL.



65
66
67
# File 'lib/turbine/pipeline/dsl.rb', line 65

def out_edges(label = nil)
  append(Sender.new(:edges, :out, label))
end

#reject(&block) ⇒ Object

Public: Given a block, emits input elements for which the block evaluates to false.

block - A block used to determine which elements are emitted.

Returns a new DSL.



128
129
130
# File 'lib/turbine/pipeline/dsl.rb', line 128

def reject(&block)
  append(Filter.new { |value| ! block.call(value) })
end

#select(&block) ⇒ Object

Public: Given a block, emits input elements for which the block evaluates to true.

block - A block used to determine which element are emitted.

Returns a new DSL.



118
119
120
# File 'lib/turbine/pipeline/dsl.rb', line 118

def select(&block)
  append(Filter.new(&block))
end

#split(*branches) ⇒ Object

Public: Splits the pipeline into separate branches, computes the values from each branch in turn, then combines the results.

branches - One or more blocks which will be given the DSL.

For example

nodes.split(->(x) { x.get(:gender) },
            ->(x) { x.in_edges.length },
            ->(x) { x.out_edges.length }).to_a
# => [ :male, 2, 3, :female, 1, 6, :female, 2, 2, ... ]

Returns a new DSL.



155
156
157
# File 'lib/turbine/pipeline/dsl.rb', line 155

def split(*branches)
  append(Split.new(*branches))
end

#traceObject

Public: Mutates the pipeline so that instead of returning a single value, it returns an array where each element is the result returned by each segment in the pipeline.

Does not work correctly with pipelines where descendants or ancestors is used before trace.

For example

jay.out(:child).out(:child).trace.to_a
# => [ [ #<Node key=:jay>, #<Node key=:claire>, #<Node key=:haley> ],
#      [ #<Node key=:jay>, #<Node key=:claire>, #<Node key=:alex> ],
#      ... ]

This can be especially useful if you explicitly include edges in your pipeline:

jay.out_edges(:child).in.out_edges(:child).in.trace.next
# => [ [ #<Node key=:jay>,
#        #<Edge :jay -:child-> :claire>,
#        #<Node key=:claire>,
#        #<Edge :claire -:child-> :haley>,
#        #<Node key=:haley> ],
#      ... ]

Returns a new DSL.



244
245
246
# File 'lib/turbine/pipeline/dsl.rb', line 244

def trace
  DSL.new(@source.append(Trace.new))
end

#uniq(&block) ⇒ Object

Public: Filters each value so that only unique elements are emitted.

block - An optional block used when determining if the value is

unique. See Pipeline::Unique#initialize.

Returns a new DSL.



254
255
256
# File 'lib/turbine/pipeline/dsl.rb', line 254

def uniq(&block)
  append(Unique.new(&block))
end