Class: Fairy::PGroupBy::MergeSortBuffer::StSt

Inherits:
Object
  • Object
show all
Defined in:
lib/fairy/node/p-group-by.rb

Instance Method Summary collapse

Constructor Details

#initialize(buffers) ⇒ StSt

Returns a new instance of StSt.



424
425
426
427
428
429
430
431
432
# File 'lib/fairy/node/p-group-by.rb', line 424

def initialize(buffers)
  @buffers = buffers.collect{|buf|
    buf.open
    kv = read_line(buf.io)
    [kv, buf]
  }.select{|kv, buf| !kv.nil?}.sort_by{|kv, buf| kv[0]}

  @fiber = nil
end

Instance Method Details

#each(&block) ⇒ Object



434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
# File 'lib/fairy/node/p-group-by.rb', line 434

def each(&block)
  key = @buffers.first.first.first
  values = KeyValueStream.new(key, self)
  @fiber = Fiber.new{yield values}
  while buf_min = @buffers.shift
    kv, buf = buf_min
    if key == kv[0]
      values.concat kv[1]
      @fiber.resume
    else
      values.push_eos
      @fiber.resume
      key = kv[0]
      values = KeyValueStream.new(key, self)
      @fiber = Fiber.new{yield values}
      values.concat kv[1]
      @fiber.resume
    end
    
    unless line = read_line(buf.io)
      buf.close!
      next
    end
    idx = @buffers.rindex{|kv, b| kv[0] <= line[0]}
#	    idx ? @buffers.insert(idx+1, [line, buf]) : @buffers.unshift([line, buf])
    buf_min[0] = line
    idx ? @buffers.insert(idx+1, buf_min) : @buffers.unshift(buf_min)

  end
  values.push_eos
  @fiber.resume
end

#read_line(io) ⇒ Object



467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
# File 'lib/fairy/node/p-group-by.rb', line 467

def read_line(io)
  begin
    k = Marshal.load(io)
    v = Marshal.load(io)
  rescue EOFError
    return nil
  rescue ArgumentError
    Log::debug(self, "MARSHAL ERROR OCCURED!!")
    io.seek(-1024, IO::SEEK_CUR)
    buf = io.read(2048)
    Log::debugf(self, "File Contents: %s", buf)

    raise
  end
  [k, v]
end