Class: Wukong::Streamer::SummingReducer

Inherits:
AccumulatingReducer show all
Defined in:
lib/wukong/streamer/summing_reducer.rb

Overview

Emit each unique key and the count of its occurrences

Instance Attribute Summary collapse

Attributes inherited from AccumulatingReducer

#key

Attributes inherited from Base

#own_options

Instance Method Summary collapse

Methods inherited from AccumulatingReducer

#after_stream, #before_stream, #get_key, #process

Methods inherited from Base

#after_stream, #bad_record!, #before_stream, #each_record, #emit, #initialize, #mapper, mapper, #monitor, #options, #process, #recordize, #run, run, #stream, #track

Constructor Details

This class inherits a constructor from Wukong::Streamer::Base

Instance Attribute Details

#summing_elementsObject

Returns the value of attribute summing_elements.



7
8
9
# File 'lib/wukong/streamer/summing_reducer.rb', line 7

def summing_elements
  @summing_elements
end

#sumsObject

Returns the value of attribute sums.



8
9
10
# File 'lib/wukong/streamer/summing_reducer.rb', line 8

def sums
  @sums
end

Instance Method Details

#accumulate(*fields) ⇒ Object

record one more for this key



16
17
18
19
# File 'lib/wukong/streamer/summing_reducer.rb', line 16

def accumulate *fields
  vals = fields.values_at( *summing_elements )
  vals.each_with_index{|val,idx| self.sums[idx] += val.to_i }
end

#finalize {|[key, sums].flatten| ... } ⇒ Object

emit each key field and the count, tab-separated.

Yields:



22
23
24
# File 'lib/wukong/streamer/summing_reducer.rb', line 22

def finalize
  yield [key, sums].flatten
end

#start!(*args) ⇒ Object

reset the counter to zero



11
12
13
# File 'lib/wukong/streamer/summing_reducer.rb', line 11

def start! *args
  self.sums = summing_elements.map{ 0 }
end