Class: MapReduce::Master

Inherits:
Object
  • Object
show all
Defined in:
lib/map_reduce/master.rb

Instance Method Summary collapse

Constructor Details

#initialize(opts = {}) ⇒ Master

Returns a new instance of Master.



5
6
7
8
9
10
11
# File 'lib/map_reduce/master.rb', line 5

def initialize(opts = {})
  @socket_addr = opts[:socket]     || ::MapReduce::DEFAULT_SOCKET
  @log_folder  = opts[:log_folder] || "/tmp/map_reduce"
  @delimiter   = opts[:delimiter]  || "\t"

  @tasks = {}
end

Instance Method Details

#after_map(&blk) ⇒ Object



23
24
25
# File 'lib/map_reduce/master.rb', line 23

def after_map(&blk)
  @after_map = blk
end

#after_reduce(&blk) ⇒ Object



27
28
29
# File 'lib/map_reduce/master.rb', line 27

def after_reduce(&blk)
  @after_reduce = blk
end

#recieve_msg(message, envelope) ⇒ Object



31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/map_reduce/master.rb', line 31

def recieve_msg(message, envelope)
  mtype = case message[0]
  when "map"
    store_map(message, envelope)
  when "map_finished"
    all_finished?(message, envelope)
  when "reduce"
    send_reduce(message, envelope)
  else
    MapReduce.logger.error("Wrong message type: #{mtype}")
  end
end

#runObject



13
14
15
16
17
# File 'lib/map_reduce/master.rb', line 13

def run
  EM.run do
    socket
  end
end

#stopObject



19
20
21
# File 'lib/map_reduce/master.rb', line 19

def stop
  EM.stop
end