Class: MapReduce::Mapper
- Inherits:
-
Object
- Object
- MapReduce::Mapper
- Includes:
- Mergeable, Reduceable, MonitorMixin
- Defined in:
- lib/map_reduce/mapper.rb
Overview
The MapReduce::Mapper class runs the mapping part of your map-reduce job.
Instance Attribute Summary collapse
-
#partitions ⇒ Object
readonly
Returns the value of attribute partitions.
Instance Method Summary collapse
-
#initialize(implementation, partitioner: HashPartitioner.new(32), memory_limit: 100 * 1024 * 1024) ⇒ Mapper
constructor
Initializes a new mapper.
-
#map(*args, **kwargs) ⇒ Object
Passes the received key to your map-reduce implementation and adds yielded key-value pair to a buffer.
-
#shuffle(&block) ⇒ Object
Performs a k-way-merge of the sorted chunks written to tempfiles while already reducing the result using your map-reduce implementation and splitting the dataset into partitions.
Constructor Details
#initialize(implementation, partitioner: HashPartitioner.new(32), memory_limit: 100 * 1024 * 1024) ⇒ Mapper
Initializes a new mapper.
23 24 25 26 27 28 29 30 31 32 33 |
# File 'lib/map_reduce/mapper.rb', line 23 def initialize(implementation, partitioner: HashPartitioner.new(32), memory_limit: 100 * 1024 * 1024) super() @implementation = implementation @partitioner = partitioner @memory_limit = memory_limit.to_i @buffer_size = 0 @buffer = [] @chunks = [] end |
Instance Attribute Details
#partitions ⇒ Object (readonly)
Returns the value of attribute partitions.
9 10 11 |
# File 'lib/map_reduce/mapper.rb', line 9 def partitions @partitions end |
Instance Method Details
#map(*args, **kwargs) ⇒ Object
Passes the received key to your map-reduce implementation and adds yielded key-value pair to a buffer. When the memory limit is reached, the chunk is sorted and written to a tempfile.
45 46 47 48 49 50 51 52 53 54 55 |
# File 'lib/map_reduce/mapper.rb', line 45 def map(*args, **kwargs) @implementation.map(*args, **kwargs) do |new_key, new_value| synchronize do @buffer.push([new_key, new_value]) @buffer_size += JSON.generate([new_key, new_value]).bytesize write_chunk if @buffer_size >= @memory_limit end end end |
#shuffle(&block) ⇒ Object
Performs a k-way-merge of the sorted chunks written to tempfiles while already reducing the result using your map-reduce implementation and splitting the dataset into partitions. Finally yields each partition with the tempfile containing the data of the partition.
67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 |
# File 'lib/map_reduce/mapper.rb', line 67 def shuffle(&block) return enum_for(:shuffle) unless block_given? write_chunk if @buffer_size > 0 partitions = {} reduce_chunk(k_way_merge(@chunks), @implementation).each do |pair| partition = @partitioner.call(pair[0]) (partitions[partition] ||= Tempfile.new).puts(JSON.generate(pair)) end @chunks.each { |tempfile| tempfile.close(true) } @chunks = [] partitions.each_value(&:rewind) partitions.each do |partition, tempfile| block.call(partition, tempfile) end partitions.each_value { |tempfile| tempfile.close(true) } nil end |