Class: Fairy::PGroupBy::DepqMergeSortBuffer::StSt

Inherits:
MergeSortBuffer::StSt show all
Defined in:
lib/fairy/node/p-group-by.rb

Instance Method Summary collapse

Methods inherited from MergeSortBuffer::StSt

#read_line

Constructor Details

#initialize(buffers) ⇒ StSt

Returns a new instance of StSt.



609
610
611
612
613
614
615
616
617
618
619
620
621
# File 'lib/fairy/node/p-group-by.rb', line 609

def initialize(buffers)
  require "depq"

  @buffers = Depq.new
  buffers.each{|buf|
    buf.open
    kv = read_line(buf.io)
    next unless kv
    @buffers.insert [kv, buf], kv.first
  }

  @fiber = nil
end

Instance Method Details

#each(&block) ⇒ Object



623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
# File 'lib/fairy/node/p-group-by.rb', line 623

def each(&block)
  key = @buffers.find_min.first.first
  values = KeyValueStream.new(key, self)
  @fiber = Fiber.new{yield values}
  while buf_min = @buffers.delete_min
    kv, buf = buf_min
    if key == kv[0]
      values.concat kv[1]
      @fiber.resume
    else
      values.push_eos
      @fiber.resume
      key = kv[0]
      values = KeyValueStream.new(key, self)
      @fiber = Fiber.new{yield values}
      values.concat kv[1]
      @fiber.resume
    end
    
    unless line = read_line(buf.io)
      buf.close!
      next
    end
    @buffers.insert [line, buf], line[0]
  end
  values.push_eos
  @fiber.resume
end