Class: Fairy::PGroupBy::PQMergeSortBuffer2::StSt

Inherits:
MergeSortBuffer::StSt show all
Defined in:
lib/fairy/node/p-group-by.rb

Instance Method Summary collapse

Methods inherited from MergeSortBuffer::StSt

#read_line

Constructor Details

#initialize(buffers) ⇒ StSt

Returns a new instance of StSt.



799
800
801
802
803
804
805
806
807
808
809
810
811
# File 'lib/fairy/node/p-group-by.rb', line 799

def initialize(buffers)
  require "priority_queue"

  @buffers = PriorityQueue.new
  buffers.each{|buf|
    buf.open
    kv = read_line(buf.io)
    next unless kv
    @buffers.push [kv, buf], kv.first
  }

  @fiber = nil
end

Instance Method Details

#each(&block) ⇒ Object



813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
# File 'lib/fairy/node/p-group-by.rb', line 813

def each(&block)
  key = @buffers.min_key.first.first
  values = KeyValueStream.new(key, self)
  @fiber = Fiber.new{yield values}
  while buf_min = @buffers.min_key
    kv, buf = buf_min
    if key == kv[0]
      values.concat kv[1]
      @fiber.resume
    else
      values.push_eos
      @fiber.resume
      key = kv[0]
      values = KeyValueStream.new(key, self)
      @fiber = Fiber.new{yield values}
      values.concat kv[1]
      @fiber.resume
    end
    
    unless line = read_line(buf.io)
      buf.close!
      @buffers.delete_min
      next
    end
    buf_min[0] = line
    @buffers.change_priority buf_min, line[0]
  end
  values.push_eos
  @fiber.resume
end