Class: Turbine::Pipeline::Split
- Defined in:
- lib/turbine/pipeline/split.rb
Overview
Splits the upstream source into multiple pipelines which are evaluated in turn on the source, with the combined results being emitted.
For example
pump = Pump.new([node1, node2, node3])
split = Split.new(->(x) { x.get(:age) },
->(x) { x.get(:gender) })
(pump | split).to_a # => [ 18, :male, 27, :female, 25, :male ]
You may supply as many separate branches as you wish.
Direct Known Subclasses
Defined Under Namespace
Classes: Branch
Instance Attribute Summary
Attributes inherited from Segment
Instance Method Summary collapse
-
#initialize(*branches) ⇒ Split
constructor
Public: Creates a new Split segment.
-
#trace ⇒ Object
Public: Returns the trace containing the most recently emitted values for all source segments.
-
#tracing=(use_tracing) ⇒ Object
Public: Enables or disables tracing on the segment.
Methods inherited from Expander
Methods inherited from Segment
#append, #each, #inspect, #next, #rewind, #to_s
Constructor Details
#initialize(*branches) ⇒ Split
Public: Creates a new Split segment.
branches - One or more procs; each proc is given a new pipeline DSL so
that you may transform / filter the inputs before the
results are merged back into the output.
Returns a new Split.
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/turbine/pipeline/split.rb', line 25 def initialize(*branches) if branches.none? raise ArgumentError, 'Split requires at least one proc' end super() # Each DSL is evaluated once, and +handle_result+ changes the source # for each value being processed. This is more efficient than creating # and evaluating a new DSL for every input. @branches = branches.map do |branch| dsl = Pipeline.dsl([]) pump = dsl.source Branch.new(pump, branch.call(dsl)) end # JRuby doesn't support calling +next+ on enum.cycle.with_index. @branches_cycle = @branches.zip((0...@branches.length).to_a).cycle end |
Instance Method Details
#trace ⇒ Object
Public: Returns the trace containing the most recently emitted values for all source segments. The trace for the current branch pipeline is merged into the trace.
See Segment#trace.
Returns an array.
53 54 55 |
# File 'lib/turbine/pipeline/split.rb', line 53 def trace super { |trace| trace.push(*@previous_trace) } end |
#tracing=(use_tracing) ⇒ Object
Public: Enables or disables tracing on the segment. Passes the boolean through to the internal branch pipelines also, so that their traces may be combined with the output.
Returns the tracing setting.
62 63 64 65 66 67 68 |
# File 'lib/turbine/pipeline/split.rb', line 62 def tracing=(use_tracing) super @branches.each do |branch| branch.pipe.source.tracing = use_tracing end end |