Class: Toiler::Actor::Supervisor

Inherits:
Concurrent::Actor::RestartingContext
  • Object
show all
Includes:
Utils::ActorLogging
Defined in:
lib/toiler/actor/supervisor.rb

Overview

Actor that starts and supervises Toiler’s actors

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeSupervisor

Returns a new instance of Supervisor.



12
13
14
15
16
# File 'lib/toiler/actor/supervisor.rb', line 12

def initialize
  @client = ::Aws::SQS::Client.new
  spawn_processors
  spawn_fetchers
end

Instance Attribute Details

#clientObject

Returns the value of attribute client.



10
11
12
# File 'lib/toiler/actor/supervisor.rb', line 10

def client
  @client
end

Instance Method Details

#on_message(_msg) ⇒ Object



18
19
20
# File 'lib/toiler/actor/supervisor.rb', line 18

def on_message(_msg)
  pass
end

#spawn_fetchersObject



22
23
24
25
26
27
28
29
30
31
32
33
34
# File 'lib/toiler/actor/supervisor.rb', line 22

def spawn_fetchers
  Toiler.active_worker_class_registry.each do |queue, klass|
    count = klass.concurrency
    begin
      fetcher = Actor::Fetcher.spawn! name: "fetcher_#{queue}".to_sym,
                                      supervise: true,
                                      args: [queue, client, count]
      Toiler.set_fetcher queue, fetcher
    rescue StandardError => e
      error "Failed to start Fetcher for queue #{queue}: #{e.message}\n#{e.backtrace.join("\n")}"
    end
  end
end

#spawn_processorsObject



36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/toiler/actor/supervisor.rb', line 36

def spawn_processors
  Toiler.active_worker_class_registry.each do |queue, klass|
    name = "processor_pool_#{queue}".to_sym
    count = klass.concurrency
    begin
      pool = Concurrent::Actor::Utils::Pool.spawn! name, count do |index|
        Actor::Processor.spawn name: "processor_#{queue}_#{index}".to_sym,
                               supervise: true,
                               args: [queue]
      end
      Toiler.set_processor_pool queue, pool
    rescue StandardError => e
      error "Failed to spawn Processor Pool for queue #{queue}: #{e.message}\n#{e.backtrace.join("\n")}"
    end
  end
end