Class: Fairy::PGroupBy::DepqMergeSortBuffer::StSt
- Inherits:
-
MergeSortBuffer::StSt
- Object
- MergeSortBuffer::StSt
- Fairy::PGroupBy::DepqMergeSortBuffer::StSt
- Defined in:
- lib/fairy/node/p-group-by.rb
Direct Known Subclasses
Instance Method Summary collapse
- #each(&block) ⇒ Object
-
#initialize(buffers) ⇒ StSt
constructor
A new instance of StSt.
Methods inherited from MergeSortBuffer::StSt
Constructor Details
#initialize(buffers) ⇒ StSt
Returns a new instance of StSt.
615 616 617 618 619 620 621 622 623 624 625 626 627 |
# File 'lib/fairy/node/p-group-by.rb', line 615 def initialize(buffers) require "depq" @buffers = Depq.new buffers.each{|buf| buf.open kv = read_line(buf.io) next unless kv @buffers.insert [kv, buf], kv.first } @fiber = nil end |
Instance Method Details
#each(&block) ⇒ Object
629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 |
# File 'lib/fairy/node/p-group-by.rb', line 629 def each(&block) key = @buffers.find_min.first.first values = KeyValueStream.new(key, self) @fiber = Fiber.new{yield values} while buf_min = @buffers.delete_min kv, buf = buf_min if key == kv[0] values.concat kv[1] @fiber.resume else values.push_eos @fiber.resume key = kv[0] values = KeyValueStream.new(key, self) @fiber = Fiber.new{yield values} values.concat kv[1] @fiber.resume end unless line = read_line(buf.io) buf.close! next end @buffers.insert [line, buf], line[0] end values.push_eos @fiber.resume end |