Class: Fairy::PGroupBy::PQMergeSortBuffer::StSt
- Inherits:
-
MergeSortBuffer::StSt
- Object
- MergeSortBuffer::StSt
- Fairy::PGroupBy::PQMergeSortBuffer::StSt
- Defined in:
- lib/fairy/node/p-group-by.rb
Defined Under Namespace
Classes: Pair
Instance Method Summary collapse
- #each(&block) ⇒ Object
-
#initialize(buffers) ⇒ StSt
constructor
A new instance of StSt.
Methods inherited from MergeSortBuffer::StSt
Constructor Details
#initialize(buffers) ⇒ StSt
Returns a new instance of StSt.
738 739 740 741 742 743 744 745 746 747 748 749 750 |
# File 'lib/fairy/node/p-group-by.rb', line 738 def initialize(buffers) require "priority_queue" @buffers = PriorityQueue.new buffers.each{|buf| buf.open kv = read_line(buf.io) next unless kv @buffers.push Pair.new(kv, buf) , kv.first } @fiber = nil end |
Instance Method Details
#each(&block) ⇒ Object
752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 |
# File 'lib/fairy/node/p-group-by.rb', line 752 def each(&block) key = @buffers.min_key.key values = KeyValueStream.new(key, self) @fiber = Fiber.new{yield values} while min_pair = @buffers.delete_min_return_key # buf, kv = buf_min if key == min_pair.key values.concat min_pair.values @fiber.resume else values.push_eos @fiber.resume key = min_pair.key values = KeyValueStream.new(key, self) @fiber = Fiber.new{yield values} values.concat min_pair.values @fiber.resume end unless line = read_line(min_pair.buf.io) min_pair.buf.close! next end min_pair.key_values = line @buffers.push min_pair, line[0] end values.push_eos @fiber.resume end |