Class: Fairy::PGroupBy::DirectMergeSortBuffer

Inherits:
DirectOnMemoryBuffer show all
Defined in:
lib/fairy/node/p-group-by.rb

Defined Under Namespace

Classes: CachedBuffer, KeyValueStream, Merger

Instance Attribute Summary

Attributes inherited from DirectOnMemoryBuffer

#log_id

Instance Method Summary collapse

Constructor Details

#initialize(njob, policy) ⇒ DirectMergeSortBuffer

Returns a new instance of DirectMergeSortBuffer.



889
890
891
892
893
894
895
896
# File 'lib/fairy/node/p-group-by.rb', line 889

def initialize(njob, policy)
	super

	@threshold = policy[:threshold]
	@threshold ||= CONF.GROUP_BY_CMSB_THRESHOLD

	@buffers = nil
end

Instance Method Details

#each(&block) ⇒ Object



954
955
956
957
958
959
960
# File 'lib/fairy/node/p-group-by.rb', line 954

def each(&block)
	if @buffers
	  each_2ndmemory &block
	else
	  super
	end
end

#each_2ndmemory(&block) ⇒ Object



962
963
964
965
966
967
968
969
970
971
972
# File 'lib/fairy/node/p-group-by.rb', line 962

def each_2ndmemory(&block)
	unless @key_values.empty?
	  store_2ndmemory(@key_values)
	  @key_values = nil
	end
	Log::info(self, "Merge Start: #{@buffers.size} files")
	Log::debug(self, @buffers.collect{|b| b.path}.join(" "))
	
	m = Merger.new(@njob, @buffers)
	m.each(&block)
end

#init_2ndmemoryObject



898
899
900
901
902
903
904
905
# File 'lib/fairy/node/p-group-by.rb', line 898

def init_2ndmemory
	require "fairy/share/fast-tempfile"

	@buffer_dir = @policy[:buffer_dir]
	@buffer_dir ||= CONF.TMP_DIR

	@buffers = []
end

#open_buffer(&block) ⇒ Object



907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
# File 'lib/fairy/node/p-group-by.rb', line 907

def open_buffer(&block)
	unless @buffers
	  init_2ndmemory
	end
	buffer = FastTempfile.open("mod-group-by-buffer-#{@njob.no}-", @buffer_dir)
	@buffers.push buffer
	if block_given?
	  begin
	    # ruby BUG#2390の対応のため.
	    # yield buffer
	    yield buffer.io
	  ensure
	    buffer.close
	  end
	else
	  buffer
	end
end

#push(value) ⇒ Object



926
927
928
929
930
931
932
933
934
935
936
937
938
939
# File 'lib/fairy/node/p-group-by.rb', line 926

def push(value)
	super

	key_values = nil
	@key_values_mutex.synchronize do
	  if @key_values.size > @threshold
	    key_values = @key_values
	    @key_values = []
	  end
	  if key_values
	    store_2ndmemory(key_values)
	  end
	end
end

#store_2ndmemory(key_values) ⇒ Object



941
942
943
944
945
946
947
948
949
950
951
952
# File 'lib/fairy/node/p-group-by.rb', line 941

def store_2ndmemory(key_values)
	Log::debug(self, "START STORE")
	key_values = key_values.sort_by{|e| @njob.hash_key(e)}
	
	open_buffer do |io|
	  key_values.each_slice(@CHUNK_SIZE) do |ary|
	    Marshal.dump(ary, io)
	  end
	end
	sorted = nil
	Log::debug(self, "FINISH STORE")
end