Class: MapReduce::ReduceLog
- Inherits:
-
Object
- Object
- MapReduce::ReduceLog
- Defined in:
- lib/map_reduce/reduce_log.rb
Instance Method Summary collapse
- #force ⇒ Object
- #get_data ⇒ Object
-
#initialize(map_log, delimiter) ⇒ ReduceLog
constructor
A new instance of ReduceLog.
- #log_file ⇒ Object
- #sort(fn) ⇒ Object
Constructor Details
#initialize(map_log, delimiter) ⇒ ReduceLog
Returns a new instance of ReduceLog.
3 4 5 6 |
# File 'lib/map_reduce/reduce_log.rb', line 3 def initialize(map_log, delimiter) @map_log = map_log @delimiter = delimiter end |
Instance Method Details
#force ⇒ Object
39 40 41 42 43 44 45 46 47 |
# File 'lib/map_reduce/reduce_log.rb', line 39 def force unless @lines fn = log_file if fn @file = File.open(fn) @lines = @file.each_line end end end |
#get_data ⇒ Object
8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
# File 'lib/map_reduce/reduce_log.rb', line 8 def get_data if @lines current_key = nil current_values = [] line = nil while true begin line = @lines.peek.chomp key, values = line.split(@delimiter) current_key ||= key if current_key != key break else current_values << values @lines.next end rescue StopIteration => e @file.close File.unlink(File.path(@file)) @lines = nil break rescue => e MapReduce.logger.error("#{e.} for line #{line.inspect}") @lines.next end end [current_key, *current_values] end end |
#log_file ⇒ Object
49 50 51 52 53 54 55 56 57 58 59 60 |
# File 'lib/map_reduce/reduce_log.rb', line 49 def log_file reduce_fn = File.join(@map_log.log_folder, "reducer.log") if File.exist? reduce_fn reduce_fn else reduce_fn = @map_log.reset if reduce_fn sort(reduce_fn) reduce_fn end end end |
#sort(fn) ⇒ Object
62 63 64 |
# File 'lib/map_reduce/reduce_log.rb', line 62 def sort(fn) `sort #{fn} -o #{fn} -k1nr` end |