Class: Wukong::Streamer::UniqByLastReducer

Inherits:
AccumulatingReducer show all
Defined in:
lib/wukong/streamer/uniq_by_last_reducer.rb

Overview

UniqByLastReducer accepts all records for a given key and emits only the last-seen.

It acts like an insecure high-school kid: for each record of a given key it discards whatever record it’s holding and adopts this new value. When a new key comes on the scene it emits the last record, like an older brother handing off his Depeche Mode collection.

For example, to extract the latest value for each property, emit your records as

[resource_type, key, timestamp, ... fields ...]

then set :sort_fields to 3 and :partition_fields to 2.

Instance Attribute Summary collapse

Attributes inherited from AccumulatingReducer

#key

Attributes inherited from Base

#own_options

Instance Method Summary collapse

Methods inherited from AccumulatingReducer

#after_stream, #before_stream, #process

Methods inherited from Base

#after_stream, #bad_record!, #before_stream, #each_record, #emit, #initialize, #mapper, mapper, #monitor, #options, #process, #recordize, #run, run, #stream, #track

Constructor Details

This class inherits a constructor from Wukong::Streamer::Base

Instance Attribute Details

#final_valueObject

Returns the value of attribute final_value.



20
21
22
# File 'lib/wukong/streamer/uniq_by_last_reducer.rb', line 20

def final_value
  @final_value
end

Instance Method Details

#accumulate(*vals) ⇒ Object

Adopt each value in turn: the last one’s the one you want.



32
33
34
# File 'lib/wukong/streamer/uniq_by_last_reducer.rb', line 32

def accumulate *vals
  self.final_value = vals
end

#finalize {|final_value| ... } ⇒ Object

Emit the last-seen value

Yields:



39
40
41
# File 'lib/wukong/streamer/uniq_by_last_reducer.rb', line 39

def finalize
  yield final_value if final_value
end

#get_key(*vals) ⇒ Object

Use first two fields as keys by default



25
26
27
# File 'lib/wukong/streamer/uniq_by_last_reducer.rb', line 25

def get_key *vals
  vals[0..1]
end

#start!(*args) ⇒ Object

Clear state on reset



46
47
48
# File 'lib/wukong/streamer/uniq_by_last_reducer.rb', line 46

def start! *args
  self.final_value = nil
end