Class: Turbine::Pipeline::Expander

Inherits:
Segment
  • Object
show all
Defined in:
lib/turbine/pipeline/expander.rb

Overview

A Pipeline segment which expands arrays, sets, and enumerators such that, instead of sending them to the next segment, each member of the collection is sent separately.

pump   = Pump.new(%w( abc def ), %w( hik klm ))
expand = Expander.new

pipeline = pump | expand

pipeline.next # => 'abc'
pipeline.next # => 'def'
pipeline.next # => 'hij'
pipeline.next # => 'klm'

Using Expander on a pipeline has the same effect as calling flatten(1) on an Array.

Direct Known Subclasses

Sender, Split, Traverse

Instance Attribute Summary

Attributes inherited from Segment

#source

Instance Method Summary collapse

Methods inherited from Segment

#append, #each, #inspect, #rewind, #to_s, #trace, #tracing=

Constructor Details

#initializeExpander

Public: Creates a new Expander segment.

Returns an Expander.



24
25
26
27
# File 'lib/turbine/pipeline/expander.rb', line 24

def initialize
  super
  @buffer = nil
end

Instance Method Details

#nextObject

Public: Runs the pipeline once returning the next value.

If a recent call to next returned an array, set or enumerator, the next value from that collection will be emitted instead. Only once all of the values in the buffer have been emitted will the upstream segment be called again.

Returns an object.



37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/turbine/pipeline/expander.rb', line 37

def next
  if from_buffer = input_from_buffer
    handle_value(from_buffer)
  else
    case value = super
    when Array, Set, Enumerator
      # Recurse into arrays, as the input may return multiple results (as
      # is commonly the case when calling Node#in, Node#descendants, etc).
      @buffer = value.to_a.compact
      @buffer.any? ? handle_value(@buffer.shift) : self.next
    else
      handle_value(value)
    end
  end
end