Class: MapReduce::Reducer

Inherits:
Object
  • Object
show all
Includes:
Mergeable, Reduceable, MonitorMixin
Defined in:
lib/map_reduce/reducer.rb

Overview

The MapReduce::Reducer class runs the reducer part of your map-reduce job.

Defined Under Namespace

Classes: InvalidChunkLimit

Instance Method Summary collapse

Constructor Details

#initialize(implementation) ⇒ Reducer

Initializes a new reducer.

Examples:

MapReduce::Reducer.new(MyImplementation.new)

Parameters:

  • implementation

    Your map-reduce implementation, i.e. an object which responds to #map and #reduce.



19
20
21
22
23
24
25
# File 'lib/map_reduce/reducer.rb', line 19

def initialize(implementation)
  super()

  @implementation = implementation

  @temp_paths ||= []
end

Instance Method Details

#add_chunkObject

Adds a chunk from the mapper-phase to the reducer by registering a tempfile and returning the path to that tempfile, such that you can download a chunk e.g. from s3 and write the content to this tempfile.

Examples:

chunk_path = reducer.add_chunk
File.write(chunk_path, "downloaded blob")


37
38
39
40
41
42
43
44
45
# File 'lib/map_reduce/reducer.rb', line 37

def add_chunk
  temp_path = TempPath.new

  synchronize do
    @temp_paths.push(temp_path)
  end

  temp_path.path
end

#reduce(chunk_limit:, &block) ⇒ Object

Performs a k-way-merge of the added chunks and yields the reduced key-value pairs. It performs multiple runs when more than chunk_limit chunks exist. A run means: it takes up to chunk_limit chunks, reduces them and pushes the result as a new chunk. At the end it removes all tempfiles, even if errors occur.

Examples:

reducer = MapReduce::Reducer.new(MyImplementation.new)

chunk1_path = reducer.add_chunk
# write data to the file

chunk2_path = reducer.add_chunk
# write data to the file

reducer.reduce(chunk_limit: 32) do |key, value|
  # ...
end

Parameters:

  • chunk_limit (Integer)

    The maximum number of files to process during a single run. Most useful when you run on a system where the number of open file descriptors is limited. If your number of file descriptors is unlimited, you want to set it to a higher number to avoid the overhead of multiple runs.

Raises:



72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/map_reduce/reducer.rb', line 72

def reduce(chunk_limit:, &block)
  return enum_for(:reduce, chunk_limit: chunk_limit) unless block_given?

  raise(InvalidChunkLimit, "Chunk limit must be >= 2") unless chunk_limit >= 2

  begin
    loop do
      slice = @temp_paths.shift(chunk_limit)
      files = slice.select { |temp_path| File.exist?(temp_path.path) }
                   .map { |temp_path| File.open(temp_path.path, "r") }

      begin
        if @temp_paths.empty?
          reduce_chunk(k_way_merge(files), @implementation).each do |pair|
            block.call(pair)
          end

          return
        end

        File.open(add_chunk, "w") do |file|
          reduce_chunk(k_way_merge(files), @implementation).each do |pair|
            file.puts JSON.generate(pair)
          end
        end
      ensure
        files.each(&:close)
        slice.each(&:delete)
      end
    end
  ensure
    @temp_paths.each(&:delete)
  end
end