Class: MapReduce::Mapper

Inherits:
Object
  • Object
show all
Defined in:
lib/map_reduce/mapper.rb

Instance Method Summary collapse

Constructor Details

#initialize(opts = {}) ⇒ Mapper

Returns a new instance of Mapper.



3
4
5
6
7
# File 'lib/map_reduce/mapper.rb', line 3

def initialize(opts = {})
  @masters         = opts[:masters] || [::MapReduce::DEFAULT_SOCKET]
  @connection_type = opts[:type]    || :em
  @task_name       = opts[:task]
end

Instance Method Details

#emit(key, value, &blk) ⇒ Object Also known as: map

Raises:



9
10
11
12
13
14
# File 'lib/map_reduce/mapper.rb', line 9

def emit(key, value, &blk)
  raise MapReduce::Exceptions::BlankKey, "Key can't be nil"  if key.nil?

  sock = pick_master(key)
  sock.send_request(["map", key, value, @task_name], &blk)
end

#wait_for_all(&blk) ⇒ Object



17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# File 'lib/map_reduce/mapper.rb', line 17

def wait_for_all(&blk)
  finished = Hash[socket.map{ |s| [s, false] }]
  sockets.each do |sock|
    sock.send_request(["map_finished", @task_name]) do |message|
      finished[sock] = message[0] == "ok"
      if finished.all?{ |k,v| v }
        if block_given?
          blk.call
        else
          return
        end
      else
        after(1) do
          wait_for_all(&blk)
        end
      end
    end
  end
end