Module: Mooro::Plugin::InterruptableServer

Defined in:
lib/mooro/plugin/fault_tolerance.rb

Instance Method Summary collapse

Instance Method Details

#make_worker(supervisor, logger, ractor_name: "interruptable_worker") ⇒ Object



9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# File 'lib/mooro/plugin/fault_tolerance.rb', line 9

def make_worker(supervisor, logger, ractor_name: "interruptable_worker")
  block = Ractor.make_shareable(method(:serve).to_proc)
  Ractor.new(supervisor, logger, block, name: ractor_name) do |supervisor, logger, serve|
    clients = Thread::Queue.new
    runner = Thread.new do
      while (current_client = clients.pop)
        begin
          serve.call(current_client)
        rescue TerminateServer
          break
        rescue => err
          logger.send(err.to_s)
        ensure
          current_client&.close
        end
      end
    end
    begin
      until (client = supervisor.take) == :terminate
        clients.push(client)
      end
    rescue Ractor::ClosedError => closed_err
      logger.send("#{closed_err}: Supervisor's outgoing port is closed")
    ensure
      runner.raise(TerminateServer)
      runner.join
    end
  end
end