Class: Actory::Receiver::Worker

Inherits:
Base
  • Object
show all
Defined in:
lib/actory/receiver/worker.rb

Constant Summary

Constants inherited from Base

Base::RECEIVER

Constants inherited from Base

Base::GLOBAL

Instance Method Summary collapse

Methods inherited from Base

get_logger_level, get_logger_output

Constructor Details

#initialize(protocol = "tcp", target: nil) ⇒ Worker

Returns a new instance of Worker.



5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# File 'lib/actory/receiver/worker.rb', line 5

def initialize(protocol="tcp", target: nil)
  protocol = RECEIVER['protocol'] if RECEIVER['protocol']
  num = Parallel.processor_count
  target ||= Actory::Receiver::EventHandler
  Parallel.map(0..num, :in_processes => num) do |n|
    @@logger.info "Starting Actory Receiver Worker ##{n + 1}/#{num} (PID = #{Process.pid}, PGROUP = #{Process.getpgrp}, protocol = #{protocol})"
    is_retried = false
    begin
      worker = send(protocol, target, n)
      Signal.trap(:TERM) { worker.stop }
      Signal.trap(:INT) { worker.stop }
      worker.run
    rescue => e
      @@logger.error(Actory::Errors::Generator.new.json(level: "error", message: e, backtrace: $@)) unless is_retried
      is_retried = true
      retry
    end
  end
end