Class: MapReduce::ReduceLog
- Inherits:
-
Object
- Object
- MapReduce::ReduceLog
- Defined in:
- lib/map_reduce/reduce_log.rb
Instance Method Summary collapse
- #force ⇒ Object
- #get_data ⇒ Object
-
#initialize(map_log, delimiter) ⇒ ReduceLog
constructor
A new instance of ReduceLog.
- #log_file ⇒ Object
- #sort(fn) ⇒ Object
Constructor Details
#initialize(map_log, delimiter) ⇒ ReduceLog
3 4 5 6 |
# File 'lib/map_reduce/reduce_log.rb', line 3 def initialize(map_log, delimiter) @map_log = map_log @delimiter = delimiter end |
Instance Method Details
#force ⇒ Object
35 36 37 38 39 40 41 42 43 |
# File 'lib/map_reduce/reduce_log.rb', line 35 def force unless @lines fn = log_file if fn @file = File.open(fn) @lines = @file.each_line end end end |
#get_data ⇒ Object
8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
# File 'lib/map_reduce/reduce_log.rb', line 8 def get_data if @lines current_key = nil current_values = [] while true begin line = @lines.peek.chomp key, values = line.split(@delimiter) current_key ||= key if current_key != key break else current_values << values @lines.next end rescue StopIteration => e @file.close File.unlink(File.path(@file)) @lines = nil break end end [current_key, *current_values] end end |
#log_file ⇒ Object
45 46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/map_reduce/reduce_log.rb', line 45 def log_file reduce_fn = File.join(@map_log.log_folder, "reducer.log") if File.exist? reduce_fn reduce_fn else reduce_fn = @map_log.reset if reduce_fn sort(reduce_fn) reduce_fn end end end |
#sort(fn) ⇒ Object
58 59 60 |
# File 'lib/map_reduce/reduce_log.rb', line 58 def sort(fn) `sort #{fn} -o #{fn} -k1nr` end |