Class: Fairy::PGroupBy::DirectOnMemoryBuffer

Inherits:
Object
  • Object
show all
Defined in:
lib/fairy/node/p-group-by.rb

Direct Known Subclasses

DirectMergeSortBuffer

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(njob, policy) ⇒ DirectOnMemoryBuffer

Returns a new instance of DirectOnMemoryBuffer.



860
861
862
863
864
865
866
867
868
869
870
871
# File 'lib/fairy/node/p-group-by.rb', line 860

def initialize(njob, policy)
	@njob = njob
	@policy = policy

	@key_values = []
	@key_values_mutex = Mutex.new

	@CHUNK_SIZE = policy[:chunk_size]
	@CHUNK_SIZE ||= CONF.GROUP_BY_CMSB_CHUNK_SIZE

	@log_id = format("%s[%s]", self.class.name.sub(/Fairy::/, ''), @njob.id)
end

Instance Attribute Details

#log_idObject

Returns the value of attribute log_id.



873
874
875
# File 'lib/fairy/node/p-group-by.rb', line 873

def log_id
  @log_id
end

Instance Method Details

#each(&block) ⇒ Object



881
882
883
884
885
# File 'lib/fairy/node/p-group-by.rb', line 881

def each(&block)
#	@key_values = @key_values.collect{|e| [@njob.hash_key(e), e]}.group_by{|k, e| k}.sort_by{|k, e| k}
	@key_values = @key_values.group_by{|e| @njob.hash_key(e)}.sort_by{|k, e| k}.collect{|k, values| kvs = KeyValueStream.new(k, nil); kvs.concat(values); kvs.push_eos; kvs}
	@key_values.each &block
end

#push(value) ⇒ Object



875
876
877
878
879
# File 'lib/fairy/node/p-group-by.rb', line 875

def push(value)
	@key_values_mutex.synchronize do
	  @key_values.push value
	end
end