Class: MapReduce::Mapper

Inherits:
Object
  • Object
show all
Includes:
Mergeable, Reduceable, MonitorMixin
Defined in:
lib/map_reduce/mapper.rb

Overview

The MapReduce::Mapper class runs the mapping part of your map-reduce job.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(implementation, partitioner: HashPartitioner.new(32), memory_limit: 100 * 1024 * 1024) ⇒ Mapper

Initializes a new mapper.

Examples:

MapReduce::Mapper.new(MyImplementation.new, partitioner: HashPartitioner.new(16), memory_limit: 100.megabytes)

Parameters:

  • implementation

    Your map-reduce implementation, i.e. an object which responds to #map and #reduce.

  • partitioner (#call) (defaults to: HashPartitioner.new(32))

    A partitioner, i.e. an object which responds to #call and calculates a partition for the passed key.

  • memory_limit (#to_i) (defaults to: 100 * 1024 * 1024)

    The memory limit, i.e. the buffer size in bytes.



23
24
25
26
27
28
29
30
31
32
33
# File 'lib/map_reduce/mapper.rb', line 23

def initialize(implementation, partitioner: HashPartitioner.new(32), memory_limit: 100 * 1024 * 1024)
  super()

  @implementation = implementation
  @partitioner = partitioner
  @memory_limit = memory_limit.to_i

  @buffer_size = 0
  @buffer = []
  @chunks = []
end

Instance Attribute Details

#partitionsObject (readonly)

Returns the value of attribute partitions.



9
10
11
# File 'lib/map_reduce/mapper.rb', line 9

def partitions
  @partitions
end

Instance Method Details

#map(*args, **kwargs) ⇒ Object

Passes the received key to your map-reduce implementation and adds yielded key-value pair to a buffer. When the memory limit is reached, the chunk is sorted and written to a tempfile.

Examples:

mapper.map("some_key")
mapper.map("other_key")

Parameters:

  • key

    The key to pass to the map-reduce implementation.



45
46
47
48
49
50
51
52
53
54
55
# File 'lib/map_reduce/mapper.rb', line 45

def map(*args, **kwargs)
  @implementation.map(*args, **kwargs) do |new_key, new_value|
    synchronize do
      @buffer.push([new_key, new_value])

      @buffer_size += JSON.generate([new_key, new_value]).bytesize

      write_chunk if @buffer_size >= @memory_limit
    end
  end
end

#shuffle(&block) ⇒ Object

Performs a k-way-merge of the sorted chunks written to tempfiles while already reducing the result using your map-reduce implementation and splitting the dataset into partitions. Finally yields each partition with the tempfile containing the data of the partition.

Examples:

mapper.shuffle do |partition, tempfile|
  # store data e.g. on s3
end


67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
# File 'lib/map_reduce/mapper.rb', line 67

def shuffle(&block)
  return enum_for(:shuffle) unless block_given?

  write_chunk if @buffer_size > 0

  partitions = {}

  reduce_chunk(k_way_merge(@chunks), @implementation).each do |pair|
    partition = @partitioner.call(pair[0])

    (partitions[partition] ||= Tempfile.new).puts(JSON.generate(pair))
  end

  @chunks.each { |tempfile| tempfile.close(true) }
  @chunks = []

  partitions.each_value(&:rewind)

  partitions.each do |partition, tempfile|
    block.call(partition, tempfile)
  end

  partitions.each_value { |tempfile| tempfile.close(true) }

  nil
end