Class: Wukong::Streamer::AccumulatingReducer
- Defined in:
- lib/wukong/streamer/accumulating_reducer.rb
Overview
AccumulatingReducer makes it easy to apply one operation across all occurrences of each key
On each occurrence of a given key, AccumulatingReducer calls accumulate, and at the final occurrence calls finalize.
See ListAccumulatingReducer and KeyCountingReducer for examples
Make sure you don’t have the bad luck, bad judgement or bad approach to accumulate more data than your box can hold before finalizing.
Direct Known Subclasses
CountingReducer, ListReducer, SummingReducer, UniqByLastReducer
Instance Attribute Summary collapse
-
#key ⇒ Object
Returns the value of attribute key.
Attributes inherited from Base
Instance Method Summary collapse
-
#accumulate(*args, &block) ⇒ Object
Override this to accumulate each record for the given key in turn.
-
#after_stream(*args) ⇒ Object
Finalize the last-seen group.
-
#before_stream ⇒ Object
make a sentinel.
-
#finalize ⇒ Object
You must override this method.
-
#get_key(*record) ⇒ Object
override for multiple-field keys, etc.
-
#process(*args, &block) ⇒ Object
Accumulate all records for a given key.
-
#start!(*args) ⇒ Object
start! is called on the the first record of the new key.
Methods inherited from Base
#bad_record!, #each_record, #emit, #initialize, #mapper, mapper, #monitor, #options, #recordize, #run, run, #stream, #track
Constructor Details
This class inherits a constructor from Wukong::Streamer::Base
Instance Attribute Details
#key ⇒ Object
Returns the value of attribute key.
17 18 19 |
# File 'lib/wukong/streamer/accumulating_reducer.rb', line 17 def key @key end |
Instance Method Details
#accumulate(*args, &block) ⇒ Object
Override this to accumulate each record for the given key in turn.
61 62 |
# File 'lib/wukong/streamer/accumulating_reducer.rb', line 61 def accumulate *args, &block end |
#after_stream(*args) ⇒ Object
Finalize the last-seen group.
77 78 79 80 |
# File 'lib/wukong/streamer/accumulating_reducer.rb', line 77 def after_stream *args finalize(){|record| emit record } unless (self.key == :__first_pass__) super *args end |
#before_stream ⇒ Object
make a sentinel
72 73 74 |
# File 'lib/wukong/streamer/accumulating_reducer.rb', line 72 def before_stream self.key = :__first_pass__ end |
#finalize ⇒ Object
You must override this method.
68 69 |
# File 'lib/wukong/streamer/accumulating_reducer.rb', line 68 def finalize end |
#get_key(*record) ⇒ Object
override for multiple-field keys, etc.
Note that get_key is called by process – so the arguments have already been recordized. In particular, if you are using StructRecordizer (or StructStreamer), you can write this as
def get_key(thing) thing.id.to_i ; end
or whatever
29 30 31 |
# File 'lib/wukong/streamer/accumulating_reducer.rb', line 29 def get_key *record record.first end |
#process(*args, &block) ⇒ Object
Accumulate all records for a given key.
When the last record for the key is seen, finalize processing and adopt the new key.
39 40 41 42 43 44 45 46 47 48 49 50 |
# File 'lib/wukong/streamer/accumulating_reducer.rb', line 39 def process *args, &block this_key = get_key(*args) if this_key != self.key # if this is a new key, unless self.key == :__first_pass__ finalize(&block) # process what we've collected so far end self.key = this_key # adopt the new key start! *args # and set up for the next accumulation end # collect the current record accumulate *args, &block end |
#start!(*args) ⇒ Object
start! is called on the the first record of the new key
55 56 |
# File 'lib/wukong/streamer/accumulating_reducer.rb', line 55 def start! *args end |