Class: MapReduce

Inherits:
MapReduceMaster::Service show all
Defined in:
lib/map_reduce.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(logger:, map_count: 5, file:) ⇒ MapReduce

Returns a new instance of MapReduce.



14
15
16
17
18
19
20
21
22
# File 'lib/map_reduce.rb', line 14

def initialize(logger:, map_count: 5, file:)
  @file = file
  @map_count = map_count
  @worker_count = 0
  @logger = logger
  @data = []
  @files = nil
  @map_finished = false
end

Instance Attribute Details

#dataObject

Returns the value of attribute data.



12
13
14
# File 'lib/map_reduce.rb', line 12

def data
  @data
end

#filesObject

Returns the value of attribute files.



12
13
14
# File 'lib/map_reduce.rb', line 12

def files
  @files
end

#loggerObject

Returns the value of attribute logger.



12
13
14
# File 'lib/map_reduce.rb', line 12

def logger
  @logger
end

#map_countObject

Returns the value of attribute map_count.



12
13
14
# File 'lib/map_reduce.rb', line 12

def map_count
  @map_count
end

#map_finishedObject

Returns the value of attribute map_finished.



12
13
14
# File 'lib/map_reduce.rb', line 12

def map_finished
  @map_finished
end

#worker_countObject

Returns the value of attribute worker_count.



12
13
14
# File 'lib/map_reduce.rb', line 12

def worker_count
  @worker_count
end

Instance Method Details

#distribute_inputObject



108
109
110
111
112
113
# File 'lib/map_reduce.rb', line 108

def distribute_input
  path_name = @file
  key = path_name.to_path
  logger.info('[Master] Start to distribute input')
  @files = split_files(key, path_name)
end

#map(&block) ⇒ Object



80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/map_reduce.rb', line 80

def map(&block)
  block = block.source.sub(/^\s*master\.map do\s*\n/, '').sub(/^\s*end\s*\n/, '')
  message = Base64.encode64(block)
  Thread.new do
    loop do
      @map_finished = true if files.empty?
      break if files.empty?

      Async do
        workers = data.select { |w| w[:status] == 0 }.first(files.count)

        semaphore = Async::Semaphore.new(workers.count)
        tasks = []

        workers.each do |worker|
          tasks << semaphore.async do
            stub = WorkerServer::Stub.new(worker[:ip], :this_channel_is_insecure)
            request = MapInfo.new(filename: files.pop, block: message, key: @encrypt_key)
            worker[:status] = 'processing'
            stub.map_operation(request)
          end
        end
        tasks.each(&:wait)
      end.wait
    end
  end
end

#ping(worker_req, _) ⇒ Object



24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/map_reduce.rb', line 24

def ping(worker_req, _)
  uuid = worker_req.uuid
  success = worker_req.success
  worker = data.find { |w| w[:uuid] == uuid }
  worker[:status] = 0

  if success == 'true'
    logger.info("[Master] Worker #{uuid} completed the map operation succesful")
  else
    logger.info("[Master] Worker #{uuid} failed to complete the map opeartion successful")
    @files << worker_req.filename
  end

  Empty.new
end

#reduce(&block) ⇒ Object



61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/map_reduce.rb', line 61

def reduce(&block)
  Thread.new do
    loop do
      next unless @map_finished == true

      block = block.source.sub(/^\s*master\.reduce do\s*\n/, '').sub(/^\s*end\s*\n/, '')
      message = Base64.encode64(block)
      worker = data.select { |w| w[:status] == 0 }.first

      stub = WorkerServer::Stub.new(worker[:ip], :this_channel_is_insecure)
      request = ReduceInfo.new(filename: "files/#{@encrypt_key}/map.txt", block: message, key: @encrypt_key)
      worker[:status] = 'processing'
      stub.reduce_operation(request)

      break
    end
  end
end

#register_worker(worker_req, _) ⇒ Object



40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/map_reduce.rb', line 40

def register_worker(worker_req, _)
  uuid = worker_req.uuid
  type = worker_req.type
  ip = worker_req.ip
  mutex = Mutex.new
  mutex.lock
  data << ({ uuid:, ip:, status: 0 })
  # That count is being back by the ruby GIL
  @worker_count += 1
  @logger.info('[Master] Worker register success')
  RegisterWorkerResult.new(result: true)
ensure
  mutex.unlock
end

#wait_for_enough_workersObject



55
56
57
58
59
# File 'lib/map_reduce.rb', line 55

def wait_for_enough_workers
  logger.info('[Master] Wait for the creation of workers')
  Worker.start_worker(logger, map_count)
  logger.info('[Master] Finished!')
end