Class: Phobos::Executor
- Inherits:
-
Object
- Object
- Phobos::Executor
- Includes:
- Instrumentation, Log
- Defined in:
- lib/phobos/executor.rb
Constant Summary
Constants included from Instrumentation
Instance Method Summary collapse
-
#initialize ⇒ Executor
constructor
A new instance of Executor.
- #start ⇒ Object
- #stop ⇒ Object
Methods included from Log
#log_debug, #log_error, #log_info, #log_warn
Methods included from Instrumentation
#instrument, subscribe, unsubscribe
Constructor Details
#initialize ⇒ Executor
Returns a new instance of Executor.
8 9 10 11 12 13 14 15 16 17 18 19 |
# File 'lib/phobos/executor.rb', line 8 def initialize @threads = Concurrent::Array.new @listeners = Phobos.config.listeners.flat_map do |config| handler_class = config.handler.constantize listener_configs = config.to_hash.deep_symbolize_keys max_concurrency = listener_configs[:max_concurrency] || 1 Array.new(max_concurrency).map do configs = listener_configs.select { |k| Constants::LISTENER_OPTS.include?(k) } Phobos::Listener.new(**configs.merge(handler: handler_class)) end end end |
Instance Method Details
#start ⇒ Object
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
# File 'lib/phobos/executor.rb', line 21 def start @signal_to_stop = false @threads.clear @thread_pool = Concurrent::FixedThreadPool.new(@listeners.size) @listeners.each do |listener| @thread_pool.post do thread = Thread.current thread.abort_on_exception = true @threads << thread run_listener(listener) end end true end |
#stop ⇒ Object
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
# File 'lib/phobos/executor.rb', line 38 def stop return if @signal_to_stop instrument('executor.stop') do @signal_to_stop = true @listeners.each(&:stop) @threads.select(&:alive?).each do |thread| begin thread.wakeup rescue StandardError nil end end @thread_pool&.shutdown @thread_pool&.wait_for_termination Phobos.logger.info { Hash(message: 'Executor stopped') } end end |