Class: Worker

Inherits:
WorkerServer::Service show all
Defined in:
lib/worker.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(worker_number:, master_ip:, port:, logger:) ⇒ Worker

Returns a new instance of Worker.



12
13
14
15
16
17
18
19
# File 'lib/worker.rb', line 12

def initialize(worker_number:, master_ip:, port:, logger:)
  @worker_number = worker_number
  @uuid = generate_uuid
  @master_ip = master_ip
  @port = port
  @logger = logger
  @result = []
end

Instance Attribute Details

#loggerObject

Returns the value of attribute logger.



10
11
12
# File 'lib/worker.rb', line 10

def logger
  @logger
end

#master_ipObject

Returns the value of attribute master_ip.



10
11
12
# File 'lib/worker.rb', line 10

def master_ip
  @master_ip
end

#portObject

Returns the value of attribute port.



10
11
12
# File 'lib/worker.rb', line 10

def port
  @port
end

#resultObject

Returns the value of attribute result.



10
11
12
# File 'lib/worker.rb', line 10

def result
  @result
end

#uuidObject

Returns the value of attribute uuid.



10
11
12
# File 'lib/worker.rb', line 10

def uuid
  @uuid
end

#worker_numberObject

Returns the value of attribute worker_number.



10
11
12
# File 'lib/worker.rb', line 10

def worker_number
  @worker_number
end

Class Method Details

.start_worker(logger, worker_number) ⇒ Object



78
79
80
81
82
83
84
85
86
87
88
# File 'lib/worker.rb', line 78

def self.start_worker(logger, worker_number)
  master_ip = '0.0.0.0:50051'
  Async do
    1.upto(worker_number) do |i|
      Async do
        worker = new(worker_number:, master_ip:, port: "3000#{i}", logger:)
        worker.start
      end
    end
  end
end

Instance Method Details

#map_operation(worker_req, _) ⇒ Object



21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/worker.rb', line 21

def map_operation(worker_req, _)
  block = eval(Base64.decode64(worker_req.block))
  block.call(File.read(worker_req.filename))
  File.open("files/#{worker_req.key}/map.txt", 'a') do |file|
    result.each do |array|
      file.puts array.inspect
    end
  end
  logger.info("[Worker] Worker #{uuid} gRPC finished the map operation")
  stub = MapReduceMaster::Stub.new(@master_ip, :this_channel_is_insecure)
  request = WorkerInfo.new(uuid: @uuid, success: 'true', filename: worker_req.filename)
  stub.ping(request)
  Empty.new
rescue StandardError => e
  logger.error("[Worker] #{uuid} with error #{e}")
  stub = MapReduceMaster::Stub.new(@master_ip, :this_channel_is_insecure)
  request = WorkerInfo.new(uuid: @uuid, success: nil, filename: worker_req.filename)
  stub.ping(request)
end

#reduce_operation(worker_req, _) ⇒ Object



41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/worker.rb', line 41

def reduce_operation(worker_req, _)
  data = sort_map_file(worker_req.filename)
  unique_keys = data.map { |item| item[0] }.uniq
  file_path = "files/#{worker_req.key}/reduce.txt"
  logger.info('[Worker] Starting Reduce Operation')
  Async do
    1.upto(unique_keys.count) do |i|
      Async do
        results = data.select { |item| item[0] == unique_keys[i] }
        block = eval(Base64.decode64(worker_req.block))
        response = block.call(results)
        File.open(file_path, 'a') do |file|
          file.puts response
        end
      end
    end
  end
  logger.info('[Worker] Finished Reduce Operation')
  logger.info("[Worker] File stored at #{file_path}")
  Empty.new
end

#startObject



63
64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/worker.rb', line 63

def start
  grpc_server = GRPC::RpcServer.new
  grpc_server.add_http2_port("0.0.0.0:#{port}", :this_port_is_insecure)
  grpc_server.handle(self)
  Thread.new do
    grpc_server.run_till_terminated
  ensure
    logger.info('[Worker] Worker gRPC thread failed')
  end
  logger.info('[Worker] Worker gRPC thread start')

  logger.info('[Worker] load functions finish')
  register_worker
end