Class: MapReduce
- Inherits:
-
MapReduceMaster::Service
- Object
- MapReduceMaster::Service
- MapReduce
- Defined in:
- lib/map_reduce.rb
Instance Attribute Summary collapse
-
#data ⇒ Object
Returns the value of attribute data.
-
#files ⇒ Object
Returns the value of attribute files.
-
#logger ⇒ Object
Returns the value of attribute logger.
-
#map_count ⇒ Object
Returns the value of attribute map_count.
-
#map_finished ⇒ Object
Returns the value of attribute map_finished.
-
#worker_count ⇒ Object
Returns the value of attribute worker_count.
Instance Method Summary collapse
- #distribute_input ⇒ Object
-
#initialize(logger:, map_count: 5, file:) ⇒ MapReduce
constructor
A new instance of MapReduce.
- #map(&block) ⇒ Object
- #ping(worker_req, _) ⇒ Object
- #reduce(&block) ⇒ Object
- #register_worker(worker_req, _) ⇒ Object
- #wait_for_enough_workers ⇒ Object
Constructor Details
#initialize(logger:, map_count: 5, file:) ⇒ MapReduce
Returns a new instance of MapReduce.
14 15 16 17 18 19 20 21 22 |
# File 'lib/map_reduce.rb', line 14 def initialize(logger:, map_count: 5, file:) @file = file @map_count = map_count @worker_count = 0 @logger = logger @data = [] @files = nil @map_finished = false end |
Instance Attribute Details
#data ⇒ Object
Returns the value of attribute data.
12 13 14 |
# File 'lib/map_reduce.rb', line 12 def data @data end |
#files ⇒ Object
Returns the value of attribute files.
12 13 14 |
# File 'lib/map_reduce.rb', line 12 def files @files end |
#logger ⇒ Object
Returns the value of attribute logger.
12 13 14 |
# File 'lib/map_reduce.rb', line 12 def logger @logger end |
#map_count ⇒ Object
Returns the value of attribute map_count.
12 13 14 |
# File 'lib/map_reduce.rb', line 12 def map_count @map_count end |
#map_finished ⇒ Object
Returns the value of attribute map_finished.
12 13 14 |
# File 'lib/map_reduce.rb', line 12 def map_finished @map_finished end |
#worker_count ⇒ Object
Returns the value of attribute worker_count.
12 13 14 |
# File 'lib/map_reduce.rb', line 12 def worker_count @worker_count end |
Instance Method Details
#distribute_input ⇒ Object
108 109 110 111 112 113 |
# File 'lib/map_reduce.rb', line 108 def distribute_input path_name = @file key = path_name.to_path logger.info('[Master] Start to distribute input') @files = split_files(key, path_name) end |
#map(&block) ⇒ Object
80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 |
# File 'lib/map_reduce.rb', line 80 def map(&block) block = block.source.sub(/^\s*master\.map do\s*\n/, '').sub(/^\s*end\s*\n/, '') = Base64.encode64(block) Thread.new do loop do @map_finished = true if files.empty? break if files.empty? Async do workers = data.select { |w| w[:status] == 0 }.first(files.count) semaphore = Async::Semaphore.new(workers.count) tasks = [] workers.each do |worker| tasks << semaphore.async do stub = WorkerServer::Stub.new(worker[:ip], :this_channel_is_insecure) request = MapInfo.new(filename: files.pop, block: , key: @encrypt_key) worker[:status] = 'processing' stub.map_operation(request) end end tasks.each(&:wait) end.wait end end end |
#ping(worker_req, _) ⇒ Object
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
# File 'lib/map_reduce.rb', line 24 def ping(worker_req, _) uuid = worker_req.uuid success = worker_req.success worker = data.find { |w| w[:uuid] == uuid } worker[:status] = 0 if success == 'true' logger.info("[Master] Worker #{uuid} completed the map operation succesful") else logger.info("[Master] Worker #{uuid} failed to complete the map opeartion successful") @files << worker_req.filename end Empty.new end |
#reduce(&block) ⇒ Object
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 |
# File 'lib/map_reduce.rb', line 61 def reduce(&block) Thread.new do loop do next unless @map_finished == true block = block.source.sub(/^\s*master\.reduce do\s*\n/, '').sub(/^\s*end\s*\n/, '') = Base64.encode64(block) worker = data.select { |w| w[:status] == 0 }.first stub = WorkerServer::Stub.new(worker[:ip], :this_channel_is_insecure) request = ReduceInfo.new(filename: "files/#{@encrypt_key}/map.txt", block: , key: @encrypt_key) worker[:status] = 'processing' stub.reduce_operation(request) break end end end |
#register_worker(worker_req, _) ⇒ Object
40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/map_reduce.rb', line 40 def register_worker(worker_req, _) uuid = worker_req.uuid type = worker_req.type ip = worker_req.ip mutex = Mutex.new mutex.lock data << ({ uuid:, ip:, status: 0 }) # That count is being back by the ruby GIL @worker_count += 1 @logger.info('[Master] Worker register success') RegisterWorkerResult.new(result: true) ensure mutex.unlock end |
#wait_for_enough_workers ⇒ Object
55 56 57 58 59 |
# File 'lib/map_reduce.rb', line 55 def wait_for_enough_workers logger.info('[Master] Wait for the creation of workers') Worker.start_worker(logger, map_count) logger.info('[Master] Finished!') end |