Class: Turbine::Pipeline::Split

Inherits:
Expander show all
Defined in:
lib/turbine/pipeline/split.rb

Overview

Splits the upstream source into multiple pipelines which are evaluated in turn on the source, with the combined results being emitted.

For example

pump  = Pump.new([node1, node2, node3])
split = Split.new(->(x) { x.get(:age) },
                  ->(x) { x.get(:gender) })

(pump | split).to_a # => [ 18, :male, 27, :female, 25, :male ]

You may supply as many separate branches as you wish.

Direct Known Subclasses

Also

Defined Under Namespace

Classes: Branch

Instance Attribute Summary

Attributes inherited from Segment

#source

Instance Method Summary collapse

Methods inherited from Expander

#next

Methods inherited from Segment

#append, #each, #inspect, #next, #rewind, #to_s

Constructor Details

#initialize(*branches) ⇒ Split

Public: Creates a new Split segment.

branches - One or more procs; each proc is given a new pipeline DSL so

that you may transform / filter the inputs before the
results are merged back into the output.

Returns a new Split.



25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/turbine/pipeline/split.rb', line 25

def initialize(*branches)
  if branches.none?
    raise ArgumentError, 'Split requires at least one proc'
  end

  super()

  # Each DSL is evaluated once, and +handle_result+ changes the source
  # for each value being processed. This is more efficient than creating
  # and evaluating a new DSL for every input.
  @branches = branches.map do |branch|
    dsl  = Pipeline.dsl([])
    pump = dsl.source

    Branch.new(pump, branch.call(dsl))
  end

  # JRuby doesn't support calling +next+ on enum.cycle.with_index.
  @branches_cycle = @branches.zip((0...@branches.length).to_a).cycle
end

Instance Method Details

#traceObject

Public: Returns the trace containing the most recently emitted values for all source segments. The trace for the current branch pipeline is merged into the trace.

See Segment#trace.

Returns an array.



53
54
55
# File 'lib/turbine/pipeline/split.rb', line 53

def trace
  super { |trace| trace.push(*@previous_trace) }
end

#tracing=(use_tracing) ⇒ Object

Public: Enables or disables tracing on the segment. Passes the boolean through to the internal branch pipelines also, so that their traces may be combined with the output.

Returns the tracing setting.



62
63
64
65
66
67
68
# File 'lib/turbine/pipeline/split.rb', line 62

def tracing=(use_tracing)
  super

  @branches.each do |branch|
    branch.pipe.source.tracing = use_tracing
  end
end