Class: MapReduce::Mapper

Inherits:
Object
  • Object
show all
Defined in:
lib/map_reduce/mapper.rb

Instance Method Summary collapse

Constructor Details

#initialize(opts = {}) ⇒ Mapper

Returns a new instance of Mapper.



3
4
5
6
7
8
# File 'lib/map_reduce/mapper.rb', line 3

def initialize(opts = {})
  @masters         = opts[:masters] || [::MapReduce::DEFAULT_SOCKET]
  @connection_type = opts[:type]    || :em
  @task_name       = opts[:task]
  @disconnected    = {}
end

Instance Method Details

#emit(key, value, &blk) ⇒ Object Also known as: map



10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# File 'lib/map_reduce/mapper.rb', line 10

def emit(key, value, &blk)
  raise MapReduce::Exceptions::BlankKey, "Key can't be nil"  if key.nil?

  sock = pick_master(key)
  sock.send_request(["map", key, value, @task_name]) do |res|
    if res
      @disconnected.delete(sock)  if @disconnected[sock]
      if blk
        blk.call(res)
      else 
        return res
      end
    else
      @disconnected[sock] = true
      emit(key, value, &blk)
    end
  end
end

#wait_for_all(&blk) ⇒ Object



30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/map_reduce/mapper.rb', line 30

def wait_for_all(&blk)
  finished = Hash[socket.map{ |s| [s, false] }]
  sockets.each do |sock|
    sock.send_request(["map_finished", @task_name]) do |message|
      finished[sock] = message[0] == "ok"
      if finished.all?{ |k,v| v }
        if block_given?
          blk.call
        else
          return
        end
      else
        after(1) do
          wait_for_all(&blk)
        end
      end
    end
  end
end