Class: Fairy::PGroupBy::PQMergeSortBuffer::StSt

Inherits:
MergeSortBuffer::StSt show all
Defined in:
lib/fairy/node/p-group-by.rb

Defined Under Namespace

Classes: Pair

Instance Method Summary collapse

Methods inherited from MergeSortBuffer::StSt

#read_line

Constructor Details

#initialize(buffers) ⇒ StSt

Returns a new instance of StSt.



738
739
740
741
742
743
744
745
746
747
748
749
750
# File 'lib/fairy/node/p-group-by.rb', line 738

def initialize(buffers)
  require "priority_queue"

  @buffers = PriorityQueue.new
  buffers.each{|buf|
    buf.open
    kv = read_line(buf.io)
    next unless kv
    @buffers.push Pair.new(kv, buf) , kv.first
  }

  @fiber = nil
end

Instance Method Details

#each(&block) ⇒ Object



752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
# File 'lib/fairy/node/p-group-by.rb', line 752

def each(&block)
  key = @buffers.min_key.key
  values = KeyValueStream.new(key, self)
  @fiber = Fiber.new{yield values}
  while min_pair = @buffers.delete_min_return_key
#	    buf, kv = buf_min
    if key == min_pair.key
      values.concat min_pair.values
      @fiber.resume
    else
      values.push_eos
      @fiber.resume
      key = min_pair.key
      values = KeyValueStream.new(key, self)
      @fiber = Fiber.new{yield values}
      values.concat min_pair.values
      @fiber.resume
    end
    
    unless line = read_line(min_pair.buf.io)
      min_pair.buf.close!
      next
    end
    min_pair.key_values = line
    @buffers.push min_pair, line[0]
  end
  values.push_eos
  @fiber.resume
end