Class: Fairy::PGroupBy::DepqMergeSortBuffer::StSt
Instance Method Summary
collapse
#read_line
Constructor Details
#initialize(buffers) ⇒ StSt
615
616
617
618
619
620
621
622
623
624
625
626
627
|
# File 'lib/fairy/node/p-group-by.rb', line 615
def initialize(buffers)
require "depq"
@buffers = Depq.new
buffers.each{|buf|
buf.open
kv = read_line(buf.io)
next unless kv
@buffers.insert [kv, buf], kv.first
}
@fiber = nil
end
|
Instance Method Details
#each(&block) ⇒ Object
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
|
# File 'lib/fairy/node/p-group-by.rb', line 629
def each(&block)
key = @buffers.find_min.first.first
values = KeyValueStream.new(key, self)
@fiber = Fiber.new{yield values}
while buf_min = @buffers.delete_min
kv, buf = buf_min
if key == kv[0]
values.concat kv[1]
@fiber.resume
else
values.push_eos
@fiber.resume
key = kv[0]
values = KeyValueStream.new(key, self)
@fiber = Fiber.new{yield values}
values.concat kv[1]
@fiber.resume
end
unless line = read_line(buf.io)
buf.close!
next
end
@buffers.insert [line, buf], line[0]
end
values.push_eos
@fiber.resume
end
|