Class: Fairy::PGroupBy::DirectKBMergeSortBuffer::CachedBuffer

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Defined in:
lib/fairy/node/p-group-by.rb

Instance Method Summary collapse

Constructor Details

#initialize(njob, io) ⇒ CachedBuffer

Returns a new instance of CachedBuffer.



1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
# File 'lib/fairy/node/p-group-by.rb', line 1371

def initialize(njob, io)
  @njob = njob
  @io = io
  io.open

  @cache = []

  @eof = false

  read_buffer
  @key = @njob.hash_key(@cache.first)
end

Instance Method Details

#each_by_same_key(&block) ⇒ Object



1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
# File 'lib/fairy/node/p-group-by.rb', line 1396

def each_by_same_key(&block)
  loop do
    @cache.each &block
    read_buffer
    return if @cache.empty?
    unless @njob.hash_key(@cache.first) == @key
      @key = @njob.hash_key(@cache.first)
      return
    end
  end
end

#eof?Boolean

Returns:

  • (Boolean)


1388
1389
1390
# File 'lib/fairy/node/p-group-by.rb', line 1388

def eof?
  @eof
end

#keyObject



1392
1393
1394
# File 'lib/fairy/node/p-group-by.rb', line 1392

def key
  @key
end

#read_bufferObject



1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
# File 'lib/fairy/node/p-group-by.rb', line 1408

def read_buffer
  io = @io.io
  begin
    @cache = Marshal.load(io)
  rescue EOFError
    @eof = true
    @cache = []
  rescue ArgumentError
    Log::debug(self, "MARSHAL ERROR OCCURED!!")
    io.seek(-1024, IO::SEEK_CUR)
    buf = io.read(2048)
    Log::debugf(self, "File Contents: %s", buf)
    raise
  end
end