Class: Wukong::Streamer::AccumulatingReducer

Inherits:
Base
  • Object
show all
Defined in:
lib/wukong/streamer/accumulating_reducer.rb

Overview

AccumulatingReducer makes it easy to apply one operation across all occurrences of each key

On each occurrence of a given key, AccumulatingReducer calls accumulate, and at the final occurrence calls finalize.

See ListAccumulatingReducer and KeyCountingReducer for examples

Make sure you don’t have the bad luck, bad judgement or bad approach to accumulate more data than your box can hold before finalizing.

Instance Attribute Summary collapse

Attributes inherited from Base

#own_options

Instance Method Summary collapse

Methods inherited from Base

#bad_record!, #each_record, #emit, #initialize, #mapper, mapper, #monitor, #options, #recordize, #run, run, #stream, #track

Constructor Details

This class inherits a constructor from Wukong::Streamer::Base

Instance Attribute Details

#keyObject

Returns the value of attribute key.



17
18
19
# File 'lib/wukong/streamer/accumulating_reducer.rb', line 17

def key
  @key
end

Instance Method Details

#accumulate(*args, &block) ⇒ Object

Override this to accumulate each record for the given key in turn.



61
62
# File 'lib/wukong/streamer/accumulating_reducer.rb', line 61

def accumulate *args, &block
end

#after_stream(*args) ⇒ Object

Finalize the last-seen group.



77
78
79
80
# File 'lib/wukong/streamer/accumulating_reducer.rb', line 77

def after_stream *args
  finalize(){|record| emit record } unless (self.key == :__first_pass__)
  super *args
end

#before_streamObject

make a sentinel



72
73
74
# File 'lib/wukong/streamer/accumulating_reducer.rb', line 72

def before_stream
  self.key = :__first_pass__
end

#finalizeObject

You must override this method.



68
69
# File 'lib/wukong/streamer/accumulating_reducer.rb', line 68

def finalize
end

#get_key(*record) ⇒ Object

override for multiple-field keys, etc.

Note that get_key is called by process – so the arguments have already been recordized. In particular, if you are using StructRecordizer (or StructStreamer), you can write this as

def get_key(thing) thing.id.to_i ; end

or whatever



29
30
31
# File 'lib/wukong/streamer/accumulating_reducer.rb', line 29

def get_key *record
  record.first
end

#process(*args, &block) ⇒ Object

Accumulate all records for a given key.

When the last record for the key is seen, finalize processing and adopt the new key.



39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/wukong/streamer/accumulating_reducer.rb', line 39

def process *args, &block
  this_key = get_key(*args)
  if this_key != self.key      # if this is a new key,
    unless self.key == :__first_pass__
      finalize(&block)         # process what we've collected so far
    end
    self.key = this_key        # adopt the new key
    start! *args               # and set up for the next accumulation
  end
  # collect the current record
  accumulate *args, &block
end

#start!(*args) ⇒ Object

start! is called on the the first record of the new key



55
56
# File 'lib/wukong/streamer/accumulating_reducer.rb', line 55

def start! *args
end