Class: Phobos::Executor
- Inherits:
-
Object
- Object
- Phobos::Executor
- Includes:
- Instrumentation
- Defined in:
- lib/phobos/executor.rb
Constant Summary collapse
- LISTENER_OPTS =
%i(handler group_id topic min_bytes max_wait_time force_encoding start_from_beginning max_bytes_per_partition).freeze
Constants included from Instrumentation
Instance Method Summary collapse
-
#initialize ⇒ Executor
constructor
A new instance of Executor.
- #start ⇒ Object
- #stop ⇒ Object
Methods included from Instrumentation
#instrument, subscribe, unsubscribe
Constructor Details
#initialize ⇒ Executor
Returns a new instance of Executor.
6 7 8 9 10 11 12 13 14 15 16 17 |
# File 'lib/phobos/executor.rb', line 6 def initialize @threads = Concurrent::Array.new @listeners = Phobos.config.listeners.flat_map do |config| handler_class = config.handler.constantize listener_configs = config.to_hash max_concurrency = listener_configs[:max_concurrency] || 1 max_concurrency.times.map do configs = listener_configs.select { |k| LISTENER_OPTS.include?(k) } Phobos::Listener.new(configs.merge(handler: handler_class)) end end end |
Instance Method Details
#start ⇒ Object
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
# File 'lib/phobos/executor.rb', line 19 def start @signal_to_stop = false @threads.clear @thread_pool = Concurrent::FixedThreadPool.new(@listeners.size) @listeners.each do |listener| @thread_pool.post do thread = Thread.current thread.abort_on_exception = true @threads << thread run_listener(listener) end end true end |
#stop ⇒ Object
36 37 38 39 40 41 42 43 44 45 46 |
# File 'lib/phobos/executor.rb', line 36 def stop return if @signal_to_stop instrument('executor.stop') do @signal_to_stop = true @listeners.map(&:stop) @threads.select(&:alive?).each { |thread| thread.wakeup rescue nil } @thread_pool&.shutdown @thread_pool&.wait_for_termination Phobos.logger.info { Hash(message: 'Executor stopped') } end end |