Class: MapReduce::ReduceLog

Inherits:
Object
  • Object
show all
Defined in:
lib/map_reduce/reduce_log.rb

Instance Method Summary collapse

Constructor Details

#initialize(map_log, delimiter) ⇒ ReduceLog

Returns a new instance of ReduceLog.



3
4
5
6
# File 'lib/map_reduce/reduce_log.rb', line 3

def initialize(map_log, delimiter)
  @map_log = map_log
  @delimiter = delimiter
end

Instance Method Details

#forceObject



39
40
41
42
43
44
45
46
47
# File 'lib/map_reduce/reduce_log.rb', line 39

def force
  unless @lines
    fn = log_file
    if fn
      @file = File.open(fn)
      @lines = @file.each_line
    end
  end
end

#get_dataObject



8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# File 'lib/map_reduce/reduce_log.rb', line 8

def get_data
  if @lines
    current_key = nil
    current_values = []
    line = nil
    while true
      begin
        line = @lines.peek.chomp
        key, values = line.split(@delimiter)
        current_key ||= key

        if current_key != key
          break
        else
          current_values << values
          @lines.next
        end
      rescue StopIteration => e
        @file.close
        File.unlink(File.path(@file))
        @lines = nil
        break
      rescue => e
        MapReduce.logger.error("#{e.message} for line #{line.inspect}")
        @lines.next
      end
    end
    [current_key, *current_values]
  end
end

#log_fileObject



49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/map_reduce/reduce_log.rb', line 49

def log_file
  reduce_fn = File.join(@map_log.log_folder, "reducer.log")
  if File.exist? reduce_fn
    reduce_fn
  else
    reduce_fn = @map_log.reset
    if reduce_fn
      sort(reduce_fn)
      reduce_fn
    end
  end
end

#sort(fn) ⇒ Object



62
63
64
# File 'lib/map_reduce/reduce_log.rb', line 62

def sort(fn)
  `sort #{fn} -o #{fn} -k1nr`
end