Class: Wukong::Streamer::RankAndBinReducer
- Defined in:
- lib/wukong/streamer/rank_and_bin_reducer.rb
Overview
Bin and order a partitioned subset of keys
For each record, appends a
-
numbering, from 0..(n-1). Each element gets a distinct numbering based on the order seen at the reducer; elements with identical keys might have different numbering on different runs.
-
rank, a number within 1..n giving the “place” of each value. Each element receives a successive (and thus unique) numbering, but all elements with the same key share the same rank. The first element for a given rank has
(rank == numbering + 1) -
bin, a number assigning keys by rank into a smaller number of groups. You must supply command line arguments
--bins=[number] --total_count=[number]giving the number of groups and predicting in advance the total number of records. (Or override the bin assignment method to use your own damn strategy).
If your data looked (in order) as follows, and 4 bins were requested:
data: 1 1 1 2.3 7 69 79 79 80 81 81
numbering: 0 1 2 3 4 5 6 7 8 9 10
rank: 1 1 1 4 5 6 7 7 9 10 10
4-bin: 1 1 1 2 2 3 3 3 4 4 4
If instead 100 bins were requested,
data: 1 1 1 2.3 7 69 79 79 80 81 81
numbering: 0 1 2 3 4 5 6 7 8 9 10
rank: 1 1 1 4 5 6 7 7 9 10 10
100-bin: 1 1 1 28 37 46 55 55 73 82 91
Note most of the bins are empty, and that the
Note that in this implementation each reducer numbers its own subset of elements from 1..total_count. If you want to number your whole dataset, you’ll have to set @:reduce_tasks => 1@ in your Script’s Script#default_options.
You might feel a bit better about yourself if you can bin several fields (or subsets) at once. The :partition_fields option to Wukong::Script (which requests a KeyFieldBasedPartitioner) can be used to route different subsets to (possibly) distinct reducers.
See the [examples/rank_and_bin_fields.rb] example script for an implementation of this. (And note the thing you have to do in case one reducer sees multiple partitions).
It would surely be best to use a total sort and supply each reducer with the initial rank of its run.
Instance Attribute Summary collapse
-
#bin_size ⇒ Object
Returns the value of attribute bin_size.
Attributes inherited from Base
Instance Method Summary collapse
- #configure_bins!(options) ⇒ Object
-
#get_bin(rank) ⇒ Object
Set the bin from the current rank elements with identical keys land in identical bins.
-
#get_key(*args) ⇒ Object
Key used to assign ranking – elements with identical keys have identical rank.
-
#get_order_params(key) ⇒ Object
Return the numbering, rank and bin for the given key.
-
#get_rank(key) ⇒ Object
The ranking is the “place” of each value: each element receives a successive (and thus unique) numbering, but all elements with the same key share the same rank.
-
#initialize(options) ⇒ RankAndBinReducer
constructor
A new instance of RankAndBinReducer.
- #process(*fields) {|fields.to_flat + [numbering, rank, bin]| ... } ⇒ Object
- #reset_order_params! ⇒ Object
Methods inherited from Base
#after_stream, #bad_record!, #before_stream, #each_record, #emit, #mapper, mapper, #monitor, #options, #recordize, run, #run, #stream, #track
Constructor Details
#initialize(options) ⇒ RankAndBinReducer
Returns a new instance of RankAndBinReducer.
65 66 67 68 69 |
# File 'lib/wukong/streamer/rank_and_bin_reducer.rb', line 65 def initialize super configure_bins! reset_order_params! end |
Instance Attribute Details
#bin_size ⇒ Object
Returns the value of attribute bin_size.
64 65 66 |
# File 'lib/wukong/streamer/rank_and_bin_reducer.rb', line 64 def bin_size @bin_size end |
Instance Method Details
#configure_bins!(options) ⇒ Object
124 125 126 127 128 129 130 131 132 133 134 135 136 |
# File 'lib/wukong/streamer/rank_and_bin_reducer.rb', line 124 def configure_bins! case when [:bins] total_count = [:total_count].to_f bins = [:bins].to_i unless total_count && (total_count != 0) then raise "To set the bin (%ile) size using --bins, we need to know the total count in advance. Please supply the total_count option." end self.bin_size = (total_count / bins) # $stderr.puts "Splitting %s records into %s bins of size %f. First element gets bin %d, last gets bin %d, median gets bin %d/%d" % # [total_count, bins, bin_size, get_bin(1), get_bin(total_count), get_bin(((total_count+1)/2.0).floor), get_bin(((total_count+1)/2.0).ceil)] else raise "Please specify a number of --bins= and a --total_count= or your own strategy to bin the ranked items." end end |
#get_bin(rank) ⇒ Object
Set the bin from the current rank elements with identical keys land in identical bins.
109 110 111 |
# File 'lib/wukong/streamer/rank_and_bin_reducer.rb', line 109 def get_bin rank ((rank-0.5) / bin_size ).floor + 1 end |
#get_key(*args) ⇒ Object
Key used to assign ranking – elements with identical keys have identical rank.
80 81 82 |
# File 'lib/wukong/streamer/rank_and_bin_reducer.rb', line 80 def get_key *args args.first end |
#get_order_params(key) ⇒ Object
Return the numbering, rank and bin for the given key
116 117 118 119 120 121 122 |
# File 'lib/wukong/streamer/rank_and_bin_reducer.rb', line 116 def get_order_params key numbering = @numbering # use un-incremented value rank = get_rank key bin = get_bin rank @numbering += 1 [numbering, rank, bin] end |
#get_rank(key) ⇒ Object
The ranking is the “place” of each value: each element receives a successive (and thus unique) numbering, but all elements with the same key share the same rank. The first element for a given rank has
(rank == numbering + 1)
97 98 99 100 101 102 103 |
# File 'lib/wukong/streamer/rank_and_bin_reducer.rb', line 97 def get_rank key if @last_key != key @rank = @numbering + 1 @last_key = key end @rank end |
#process(*fields) {|fields.to_flat + [numbering, rank, bin]| ... } ⇒ Object
138 139 140 141 |
# File 'lib/wukong/streamer/rank_and_bin_reducer.rb', line 138 def process *fields numbering, rank, bin = get_order_params(get_key(*fields)) yield fields.to_flat + [numbering, rank, bin] end |
#reset_order_params! ⇒ Object
84 85 86 87 88 |
# File 'lib/wukong/streamer/rank_and_bin_reducer.rb', line 84 def reset_order_params! @last_key = nil @numbering = 0 @rank = 1 end |