Class: Wukong::Streamer::RankAndBinReducer

Inherits:
Base show all
Defined in:
lib/wukong/streamer/rank_and_bin_reducer.rb

Overview

Bin and order a partitioned subset of keys

For each record, appends a

  • numbering, from 0..(n-1). Each element gets a distinct numbering based on the order seen at the reducer; elements with identical keys might have different numbering on different runs.

  • rank, a number within 1..n giving the “place” of each value. Each element receives a successive (and thus unique) numbering, but all elements with the same key share the same rank. The first element for a given rank has

    (rank == numbering + 1)
    
  • bin, a number assigning keys by rank into a smaller number of groups. You must supply command line arguments

    --bins=[number] --total_count=[number]
    

    giving the number of groups and predicting in advance the total number of records. (Or override the bin assignment method to use your own damn strategy).

If your data looked (in order) as follows, and 4 bins were requested:

data:       1   1   1 2.3   7  69  79  79  80  81  81
numbering:  0   1   2   3   4   5   6   7   8   9  10
rank:       1   1   1   4   5   6   7   7   9  10  10
4-bin:      1   1   1   2   2   3   3   3   4   4   4

If instead 100 bins were requested,

data:       1   1   1 2.3   7  69  79  79  80  81  81
numbering:  0   1   2   3   4   5   6   7   8   9  10
rank:       1   1   1   4   5   6   7   7   9  10  10
100-bin:    1   1   1  28  37  46  55  55  73  82  91

Note most of the bins are empty, and that the


Note that in this implementation each reducer numbers its own subset of elements from 1..total_count. If you want to number your whole dataset, you’ll have to set @:reduce_tasks => 1@ in your Script’s Script#default_options.

You might feel a bit better about yourself if you can bin several fields (or subsets) at once. The :partition_fields option to Wukong::Script (which requests a KeyFieldBasedPartitioner) can be used to route different subsets to (possibly) distinct reducers.

See the [examples/rank_and_bin_fields.rb] example script for an implementation of this. (And note the thing you have to do in case one reducer sees multiple partitions).

It would surely be best to use a total sort and supply each reducer with the initial rank of its run.

Instance Attribute Summary collapse

Attributes inherited from Base

#own_options

Instance Method Summary collapse

Methods inherited from Base

#after_stream, #bad_record!, #before_stream, #each_record, #emit, #mapper, mapper, #monitor, #options, #recordize, run, #run, #stream, #track

Constructor Details

#initialize(options) ⇒ RankAndBinReducer

Returns a new instance of RankAndBinReducer.



65
66
67
68
69
# File 'lib/wukong/streamer/rank_and_bin_reducer.rb', line 65

def initialize options
  super options
  configure_bins! options
  reset_order_params!
end

Instance Attribute Details

#bin_sizeObject

Returns the value of attribute bin_size.



64
65
66
# File 'lib/wukong/streamer/rank_and_bin_reducer.rb', line 64

def bin_size
  @bin_size
end

Instance Method Details

#configure_bins!(options) ⇒ Object



124
125
126
127
128
129
130
131
132
133
134
135
136
# File 'lib/wukong/streamer/rank_and_bin_reducer.rb', line 124

def configure_bins! options
  case
  when options[:bins]
    total_count = options[:total_count].to_f
    bins        = options[:bins].to_i
    unless total_count && (total_count != 0) then raise "To set the bin (%ile) size using --bins, we need to know the total count in advance. Please supply the total_count option." end
    self.bin_size = (total_count / bins)
    # $stderr.puts "Splitting %s records into %s bins of size %f. First element gets bin %d, last gets bin %d, median gets bin %d/%d" %
    #   [total_count, bins, bin_size, get_bin(1), get_bin(total_count), get_bin(((total_count+1)/2.0).floor), get_bin(((total_count+1)/2.0).ceil)]
  else
    raise "Please specify a number of --bins= and a --total_count= or your own strategy to bin the ranked items."
  end
end

#get_bin(rank) ⇒ Object

Set the bin from the current rank elements with identical keys land in identical bins.



109
110
111
# File 'lib/wukong/streamer/rank_and_bin_reducer.rb', line 109

def get_bin rank
  ((rank-0.5) / bin_size ).floor + 1
end

#get_key(*args) ⇒ Object

Key used to assign ranking – elements with identical keys have identical rank.



80
81
82
# File 'lib/wukong/streamer/rank_and_bin_reducer.rb', line 80

def get_key *args
  args.first
end

#get_order_params(key) ⇒ Object

Return the numbering, rank and bin for the given key



116
117
118
119
120
121
122
# File 'lib/wukong/streamer/rank_and_bin_reducer.rb', line 116

def get_order_params key
  numbering   = @numbering # use un-incremented value
  rank        = get_rank key
  bin         = get_bin  rank
  @numbering += 1
  [numbering, rank, bin]
end

#get_rank(key) ⇒ Object

The ranking is the “place” of each value: each element receives a successive (and thus unique) numbering, but all elements with the same key share the same rank. The first element for a given rank has

(rank == numbering + 1)


97
98
99
100
101
102
103
# File 'lib/wukong/streamer/rank_and_bin_reducer.rb', line 97

def get_rank key
  if @last_key != key
    @rank       = @numbering + 1
    @last_key   = key
  end
  @rank
end

#process(*fields) {|fields.to_flat + [numbering, rank, bin]| ... } ⇒ Object

Yields:

  • (fields.to_flat + [numbering, rank, bin])


138
139
140
141
# File 'lib/wukong/streamer/rank_and_bin_reducer.rb', line 138

def process *fields
  numbering, rank, bin = get_order_params(get_key(*fields))
  yield fields.to_flat + [numbering, rank, bin]
end

#reset_order_params!Object



84
85
86
87
88
# File 'lib/wukong/streamer/rank_and_bin_reducer.rb', line 84

def reset_order_params!
  @last_key  = nil
  @numbering = 0
  @rank      = 1
end