Class: Worker
- Inherits:
-
WorkerServer::Service
- Object
- WorkerServer::Service
- Worker
- Defined in:
- lib/worker.rb
Instance Attribute Summary collapse
-
#logger ⇒ Object
Returns the value of attribute logger.
-
#master_ip ⇒ Object
Returns the value of attribute master_ip.
-
#port ⇒ Object
Returns the value of attribute port.
-
#result ⇒ Object
Returns the value of attribute result.
-
#uuid ⇒ Object
Returns the value of attribute uuid.
-
#worker_number ⇒ Object
Returns the value of attribute worker_number.
Class Method Summary collapse
Instance Method Summary collapse
-
#initialize(worker_number:, master_ip:, port:, logger:) ⇒ Worker
constructor
A new instance of Worker.
- #map_operation(worker_req, _) ⇒ Object
- #reduce_operation(worker_req, _) ⇒ Object
- #start ⇒ Object
Constructor Details
#initialize(worker_number:, master_ip:, port:, logger:) ⇒ Worker
Returns a new instance of Worker.
12 13 14 15 16 17 18 19 |
# File 'lib/worker.rb', line 12 def initialize(worker_number:, master_ip:, port:, logger:) @worker_number = worker_number @uuid = generate_uuid @master_ip = master_ip @port = port @logger = logger @result = [] end |
Instance Attribute Details
#logger ⇒ Object
Returns the value of attribute logger.
10 11 12 |
# File 'lib/worker.rb', line 10 def logger @logger end |
#master_ip ⇒ Object
Returns the value of attribute master_ip.
10 11 12 |
# File 'lib/worker.rb', line 10 def master_ip @master_ip end |
#port ⇒ Object
Returns the value of attribute port.
10 11 12 |
# File 'lib/worker.rb', line 10 def port @port end |
#result ⇒ Object
Returns the value of attribute result.
10 11 12 |
# File 'lib/worker.rb', line 10 def result @result end |
#uuid ⇒ Object
Returns the value of attribute uuid.
10 11 12 |
# File 'lib/worker.rb', line 10 def uuid @uuid end |
#worker_number ⇒ Object
Returns the value of attribute worker_number.
10 11 12 |
# File 'lib/worker.rb', line 10 def worker_number @worker_number end |
Class Method Details
.start_worker(logger, worker_number) ⇒ Object
78 79 80 81 82 83 84 85 86 87 88 |
# File 'lib/worker.rb', line 78 def self.start_worker(logger, worker_number) master_ip = '0.0.0.0:50051' Async do 1.upto(worker_number) do |i| Async do worker = new(worker_number:, master_ip:, port: "3000#{i}", logger:) worker.start end end end end |
Instance Method Details
#map_operation(worker_req, _) ⇒ Object
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
# File 'lib/worker.rb', line 21 def map_operation(worker_req, _) block = eval(Base64.decode64(worker_req.block)) block.call(File.read(worker_req.filename)) File.open("files/#{worker_req.key}/map.txt", 'a') do |file| result.each do |array| file.puts array.inspect end end logger.info("[Worker] Worker #{uuid} gRPC finished the map operation") stub = MapReduceMaster::Stub.new(@master_ip, :this_channel_is_insecure) request = WorkerInfo.new(uuid: @uuid, success: 'true', filename: worker_req.filename) stub.ping(request) Empty.new rescue StandardError => e logger.error("[Worker] #{uuid} with error #{e}") stub = MapReduceMaster::Stub.new(@master_ip, :this_channel_is_insecure) request = WorkerInfo.new(uuid: @uuid, success: nil, filename: worker_req.filename) stub.ping(request) end |
#reduce_operation(worker_req, _) ⇒ Object
41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/worker.rb', line 41 def reduce_operation(worker_req, _) data = sort_map_file(worker_req.filename) unique_keys = data.map { |item| item[0] }.uniq file_path = "files/#{worker_req.key}/reduce.txt" logger.info('[Worker] Starting Reduce Operation') Async do 1.upto(unique_keys.count) do |i| Async do results = data.select { |item| item[0] == unique_keys[i] } block = eval(Base64.decode64(worker_req.block)) response = block.call(results) File.open(file_path, 'a') do |file| file.puts response end end end end logger.info('[Worker] Finished Reduce Operation') logger.info("[Worker] File stored at #{file_path}") Empty.new end |
#start ⇒ Object
63 64 65 66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/worker.rb', line 63 def start grpc_server = GRPC::RpcServer.new grpc_server.add_http2_port("0.0.0.0:#{port}", :this_port_is_insecure) grpc_server.handle(self) Thread.new do grpc_server.run_till_terminated ensure logger.info('[Worker] Worker gRPC thread failed') end logger.info('[Worker] Worker gRPC thread start') logger.info('[Worker] load functions finish') register_worker end |