Class: Toiler::Actor::Supervisor

Inherits:
Concurrent::Actor::RestartingContext
  • Object
show all
Includes:
Utils::ActorLogging
Defined in:
lib/toiler/actor/supervisor.rb

Overview

Actor that starts and supervises Toiler’s actors

Instance Method Summary collapse

Constructor Details

#initializeSupervisor

Returns a new instance of Supervisor.



12
13
14
15
16
17
# File 'lib/toiler/actor/supervisor.rb', line 12

def initialize
  super

  spawn_processors
  spawn_fetchers
end

Instance Method Details

#on_message(_msg) ⇒ Object



19
20
21
# File 'lib/toiler/actor/supervisor.rb', line 19

def on_message(_msg)
  pass
end

#spawn_fetchersObject



23
24
25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/toiler/actor/supervisor.rb', line 23

def spawn_fetchers
  Toiler.active_worker_class_registry.each do |queue, klass|
    count           = klass.concurrency
    provider        = klass.provider
    begin
      fetcher = Actor::Fetcher.spawn! name: "fetcher_#{queue}".to_sym,
                                      supervise: true,
                                      args: [queue, count, provider]
      Toiler.set_fetcher queue, fetcher
    rescue StandardError => e
      error "Failed to start Fetcher for queue #{queue}: #{e.message}\n#{e.backtrace.join("\n")}"
    end
  end
end

#spawn_processorsObject



38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/toiler/actor/supervisor.rb', line 38

def spawn_processors
  Toiler.active_worker_class_registry.each do |queue, klass|
    name = "processor_pool_#{queue}".to_sym
    count = klass.concurrency
    begin
      pool = Concurrent::Actor::Utils::Pool.spawn! name, count do |index|
        Actor::Processor.spawn name: "processor_#{queue}_#{index}".to_sym,
                               supervise: true,
                               args: [queue]
      end
      Toiler.set_processor_pool queue, pool
    rescue StandardError => e
      error "Failed to spawn Processor Pool for queue #{queue}: #{e.message}\n#{e.backtrace.join("\n")}"
    end
  end
end