Class: Fairy::PGroupBy::OnMemoryBuffer

Inherits:
Object
  • Object
show all
Defined in:
lib/fairy/node/p-group-by.rb

Direct Known Subclasses

CommandMergeSortBuffer

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(njob, policy) ⇒ OnMemoryBuffer

Returns a new instance of OnMemoryBuffer.



173
174
175
176
177
178
179
180
181
182
183
# File 'lib/fairy/node/p-group-by.rb', line 173

def initialize(njob, policy)
	@njob = njob
	@policy = policy

	@key_values = {}
	@key_values_mutex = Mutex.new

	@CHUNK_SIZE = CONF.GROUP_BY_CMSB_CHUNK_SIZE

	@log_id = format("%s[%s]", self.class.name.sub(/Fairy::/, ''), @njob.id)
end

Instance Attribute Details

#log_idObject

Returns the value of attribute log_id.



185
186
187
# File 'lib/fairy/node/p-group-by.rb', line 185

def log_id
  @log_id
end

Instance Method Details

#each(&block) ⇒ Object



199
200
201
202
203
204
205
206
# File 'lib/fairy/node/p-group-by.rb', line 199

def each(&block)
	@key_values.each do |key, vv|
	  kvs = KeyValueStream.new(key, nil)
	  vv.each{|v| kvs.concat v}
	  kvs.push_eos
	  block.call(kvs)
	end
end

#push(value) ⇒ Object



187
188
189
190
191
192
193
194
195
196
197
# File 'lib/fairy/node/p-group-by.rb', line 187

def push(value)
	key = @njob.hash_key(value)

	@key_values_mutex.synchronize do
	  @key_values[key] = [[]] unless @key_values.key?(key)
	  if @CHUNK_SIZE < @key_values[key].last.size 
	    @key_values[key].push []
	  end
	  @key_values[key].last.push value
	end
end