Class: MapReduce::Reducer
- Inherits:
-
Object
- Object
- MapReduce::Reducer
- Includes:
- Mergeable, Reduceable, MonitorMixin
- Defined in:
- lib/map_reduce/reducer.rb
Overview
The MapReduce::Reducer class runs the reducer part of your map-reduce job.
Defined Under Namespace
Classes: InvalidChunkLimit
Instance Method Summary collapse
-
#add_chunk ⇒ Object
Adds a chunk from the mapper-phase to the reducer by registering a tempfile and returning the path to that tempfile, such that you can download a chunk e.g.
-
#initialize(implementation) ⇒ Reducer
constructor
Initializes a new reducer.
-
#reduce(chunk_limit:, &block) ⇒ Object
Performs a k-way-merge of the added chunks and yields the reduced key-value pairs.
Constructor Details
#initialize(implementation) ⇒ Reducer
Initializes a new reducer.
19 20 21 22 23 24 25 |
# File 'lib/map_reduce/reducer.rb', line 19 def initialize(implementation) super() @implementation = implementation @temp_paths ||= [] end |
Instance Method Details
#add_chunk ⇒ Object
Adds a chunk from the mapper-phase to the reducer by registering a tempfile and returning the path to that tempfile, such that you can download a chunk e.g. from s3 and write the content to this tempfile.
37 38 39 40 41 42 43 44 45 |
# File 'lib/map_reduce/reducer.rb', line 37 def add_chunk temp_path = TempPath.new synchronize do @temp_paths.push(temp_path) end temp_path.path end |
#reduce(chunk_limit:, &block) ⇒ Object
Performs a k-way-merge of the added chunks and yields the reduced key-value pairs. It performs multiple runs when more than chunk_limit chunks exist. A run means: it takes up to chunk_limit chunks, reduces them and pushes the result as a new chunk. At the end it removes all tempfiles, even if errors occur.
72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 |
# File 'lib/map_reduce/reducer.rb', line 72 def reduce(chunk_limit:, &block) return enum_for(:reduce, chunk_limit: chunk_limit) unless block_given? raise(InvalidChunkLimit, "Chunk limit must be >= 2") unless chunk_limit >= 2 begin loop do slice = @temp_paths.shift(chunk_limit) files = slice.select { |temp_path| File.exist?(temp_path.path) } .map { |temp_path| File.open(temp_path.path, "r") } begin if @temp_paths.empty? reduce_chunk(k_way_merge(files), @implementation).each do |pair| block.call(pair) end return end File.open(add_chunk, "w") do |file| reduce_chunk(k_way_merge(files), @implementation).each do |pair| file.puts JSON.generate(pair) end end ensure files.each(&:close) slice.each(&:delete) end end ensure @temp_paths.each(&:delete) end end |