Class: Fairy::PGroupBy::DirectKB2MergeSortBuffer

Inherits:
DirectKBMergeSortBuffer show all
Defined in:
lib/fairy/node/p-group-by.rb

Defined Under Namespace

Classes: CachedBuffer

Instance Attribute Summary

Attributes inherited from OnMemoryBuffer

#log_id

Instance Method Summary collapse

Methods inherited from CommandMergeSortBuffer

#each, #init_2ndmemory, #initialize, #open_buffer, #push

Methods inherited from OnMemoryBuffer

#each, #initialize, #push

Constructor Details

This class inherits a constructor from Fairy::PGroupBy::CommandMergeSortBuffer

Instance Method Details

#each_2ndmemory(&block) ⇒ Object



1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
# File 'lib/fairy/node/p-group-by.rb', line 1454

def each_2ndmemory(&block)
	unless @key_values.empty?
	  store_2ndmemory(@key_values)
	  @key_values = nil
	end
	Log::info(self, "Merge Start: #{@buffers.size} files")
	Log::debug(self, @buffers.collect{|b| b.path}.join(" "))
	
	m = DirectMergeSortBuffer::Merger.new(@njob, @buffers, CachedBuffer)
	m.each(&block)
end

#store_2ndmemory(key_values) ⇒ Object



1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
# File 'lib/fairy/node/p-group-by.rb', line 1427

def store_2ndmemory(key_values)
	Log::debug(self, "START STORE")
	sorted = key_values.sort_by{|e| e.first}
	
	open_buffer do |io|
	  tmpary = []
	  tmpary_sz = 0
	  sorted.each do |key, vv|
	    vv.each do |values|
 if tmpary_sz >= @CHUNK_SIZE
		Marshal.dump(tmpary, io)
		tmpary = []
		tmpary_sz = 0
 end
 tmpary.push values
 tmpary_sz += values.size
	    end
	  end
	  if tmpary_sz > 0
	    Marshal.dump(tmpary, io)
	    tmpary = nil
	  end
	end
	sorted = nil
	Log::debug(self, "FINISH STORE")
end