Class: Fairy::PGroupBy::OnMemoryBuffer
- Inherits:
-
Object
- Object
- Fairy::PGroupBy::OnMemoryBuffer
- Defined in:
- lib/fairy/node/p-group-by.rb
Direct Known Subclasses
Instance Attribute Summary collapse
-
#log_id ⇒ Object
Returns the value of attribute log_id.
Instance Method Summary collapse
- #each(&block) ⇒ Object
-
#initialize(njob, policy) ⇒ OnMemoryBuffer
constructor
A new instance of OnMemoryBuffer.
- #push(value) ⇒ Object
Constructor Details
#initialize(njob, policy) ⇒ OnMemoryBuffer
Returns a new instance of OnMemoryBuffer.
173 174 175 176 177 178 179 180 181 182 183 |
# File 'lib/fairy/node/p-group-by.rb', line 173 def initialize(njob, policy) @njob = njob @policy = policy @key_values = {} @key_values_mutex = Mutex.new @CHUNK_SIZE = CONF.GROUP_BY_CMSB_CHUNK_SIZE @log_id = format("%s[%s]", self.class.name.sub(/Fairy::/, ''), @njob.id) end |
Instance Attribute Details
#log_id ⇒ Object
Returns the value of attribute log_id.
185 186 187 |
# File 'lib/fairy/node/p-group-by.rb', line 185 def log_id @log_id end |
Instance Method Details
#each(&block) ⇒ Object
199 200 201 202 203 204 205 206 |
# File 'lib/fairy/node/p-group-by.rb', line 199 def each(&block) @key_values.each do |key, vv| kvs = KeyValueStream.new(key, nil) vv.each{|v| kvs.concat v} kvs.push_eos block.call(kvs) end end |
#push(value) ⇒ Object
187 188 189 190 191 192 193 194 195 196 197 |
# File 'lib/fairy/node/p-group-by.rb', line 187 def push(value) key = @njob.hash_key(value) @key_values_mutex.synchronize do @key_values[key] = [[]] unless @key_values.key?(key) if @CHUNK_SIZE < @key_values[key].last.size @key_values[key].push [] end @key_values[key].last.push value end end |