Class: Fairy::PGroupBy::DirectMergeSortBuffer

Inherits:
DirectOnMemoryBuffer show all
Defined in:
lib/fairy/node/p-group-by.rb

Defined Under Namespace

Classes: CachedBuffer, KeyValueStream, Merger

Instance Attribute Summary

Attributes inherited from DirectOnMemoryBuffer

#log_id

Instance Method Summary collapse

Constructor Details

#initialize(njob, policy) ⇒ DirectMergeSortBuffer

Returns a new instance of DirectMergeSortBuffer.



895
896
897
898
899
900
901
902
# File 'lib/fairy/node/p-group-by.rb', line 895

def initialize(njob, policy)
  super

  @threshold = policy[:threshold]
  @threshold ||= CONF.GROUP_BY_CMSB_THRESHOLD

  @buffers = nil
end

Instance Method Details

#each(&block) ⇒ Object



960
961
962
963
964
965
966
# File 'lib/fairy/node/p-group-by.rb', line 960

def each(&block)
  if @buffers
    each_2ndmemory &block
  else
    super
  end
end

#each_2ndmemory(&block) ⇒ Object



968
969
970
971
972
973
974
975
976
977
978
# File 'lib/fairy/node/p-group-by.rb', line 968

def each_2ndmemory(&block)
  unless @key_values.empty?
    store_2ndmemory(@key_values)
    @key_values = nil
  end
  Log::info(self, "Merge Start: #{@buffers.size} files")
  Log::debug(self, @buffers.collect{|b| b.path}.join(" "))
  
  m = Merger.new(@njob, @buffers)
  m.each(&block)
end

#init_2ndmemoryObject



904
905
906
907
908
909
910
911
# File 'lib/fairy/node/p-group-by.rb', line 904

def init_2ndmemory
  require "fairy/share/fast-tempfile"

  @buffer_dir = @policy[:buffer_dir]
  @buffer_dir ||= CONF.TMP_DIR

  @buffers = []
end

#open_buffer(&block) ⇒ Object



913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
# File 'lib/fairy/node/p-group-by.rb', line 913

def open_buffer(&block)
  unless @buffers
    init_2ndmemory
  end
  buffer = FastTempfile.open("mod-group-by-buffer-#{@njob.no}-", @buffer_dir)
  @buffers.push buffer
  if block_given?
    begin
      # ruby BUG#2390の対応のため.
      # yield buffer
      yield buffer.io
    ensure
      buffer.close
    end
  else
    buffer
  end
end

#push(value) ⇒ Object



932
933
934
935
936
937
938
939
940
941
942
943
944
945
# File 'lib/fairy/node/p-group-by.rb', line 932

def push(value)
  super

  key_values = nil
  @key_values_mutex.synchronize do
    if @key_values.size > @threshold
      key_values = @key_values
      @key_values = []
    end
    if key_values
      store_2ndmemory(key_values)
    end
  end
end

#store_2ndmemory(key_values) ⇒ Object



947
948
949
950
951
952
953
954
955
956
957
958
# File 'lib/fairy/node/p-group-by.rb', line 947

def store_2ndmemory(key_values)
  Log::debug(self, "START STORE")
  key_values = key_values.sort_by{|e| @njob.hash_key(e)}
  
  open_buffer do |io|
    key_values.each_slice(@CHUNK_SIZE) do |ary|
      Marshal.dump(ary, io)
    end
  end
  sorted = nil
  Log::debug(self, "FINISH STORE")
end