Class: Pacer::Transform::Join::JoinPipe

Inherits:
Pipes::RubyPipe show all
Defined in:
lib/pacer/transform/join.rb

Instance Attribute Summary collapse

Attributes inherited from Pipes::RubyPipe

#starts

Instance Method Summary collapse

Methods inherited from Pipes::RubyPipe

#enablePath, #setStarts

Constructor Details

#initialize(back, key_block, unique) ⇒ JoinPipe

Returns a new instance of JoinPipe.



39
40
41
42
43
# File 'lib/pacer/transform/join.rb', line 39

def initialize(back, key_block, unique)
  super()
  @unique = unique
  @block = Pacer::Wrappers::WrappingPipeFunction.new back, key_block
end

Instance Attribute Details

#blockObject (readonly)

Returns the value of attribute block.



36
37
38
# File 'lib/pacer/transform/join.rb', line 36

def block
  @block
end

#groupsObject (readonly)

Returns the value of attribute groups.



36
37
38
# File 'lib/pacer/transform/join.rb', line 36

def groups
  @groups
end

#to_emitObject

Returns the value of attribute to_emit.



37
38
39
# File 'lib/pacer/transform/join.rb', line 37

def to_emit
  @to_emit
end

#uniqueObject (readonly)

Returns the value of attribute unique.



36
37
38
# File 'lib/pacer/transform/join.rb', line 36

def unique
  @unique
end

Instance Method Details

#processNextStartObject



45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/pacer/transform/join.rb', line 45

def processNextStart
  unless to_emit
    coll = unique ? Set : Array
    groups = Hash.new { |h, k| h[k] = Pacer::GroupVertex.new k, block.graph, block.wrapper, coll.new }
    while starts.hasNext
      el = starts.next
      groups[block.call(el)].add_component el
    end
    self.to_emit = groups.values
  end
  if to_emit.empty?
    raise Pacer::Pipes::EmptyPipe.instance
  else
    to_emit.shift
  end
end

#resetObject



62
63
64
65
66
# File 'lib/pacer/transform/join.rb', line 62

def reset
  super
  self.loaded = false
  self.to_emit = nil
end