Class: Fairy::PGroupBy::PQMergeSortBuffer2::StSt
- Inherits:
-
MergeSortBuffer::StSt
- Object
- MergeSortBuffer::StSt
- Fairy::PGroupBy::PQMergeSortBuffer2::StSt
- Defined in:
- lib/fairy/node/p-group-by.rb
Instance Method Summary collapse
- #each(&block) ⇒ Object
-
#initialize(buffers) ⇒ StSt
constructor
A new instance of StSt.
Methods inherited from MergeSortBuffer::StSt
Constructor Details
#initialize(buffers) ⇒ StSt
Returns a new instance of StSt.
799 800 801 802 803 804 805 806 807 808 809 810 811 |
# File 'lib/fairy/node/p-group-by.rb', line 799 def initialize(buffers) require "priority_queue" @buffers = PriorityQueue.new buffers.each{|buf| buf.open kv = read_line(buf.io) next unless kv @buffers.push [kv, buf], kv.first } @fiber = nil end |
Instance Method Details
#each(&block) ⇒ Object
813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 |
# File 'lib/fairy/node/p-group-by.rb', line 813 def each(&block) key = @buffers.min_key.first.first values = KeyValueStream.new(key, self) @fiber = Fiber.new{yield values} while buf_min = @buffers.min_key kv, buf = buf_min if key == kv[0] values.concat kv[1] @fiber.resume else values.push_eos @fiber.resume key = kv[0] values = KeyValueStream.new(key, self) @fiber = Fiber.new{yield values} values.concat kv[1] @fiber.resume end unless line = read_line(buf.io) buf.close! @buffers.delete_min next end buf_min[0] = line @buffers.change_priority buf_min, line[0] end values.push_eos @fiber.resume end |