Class: Phobos::Executor
- Inherits:
-
Object
- Object
- Phobos::Executor
- Includes:
- Instrumentation
- Defined in:
- lib/phobos/executor.rb
Constant Summary collapse
- LISTENER_OPTS =
%i( handler group_id topic min_bytes max_wait_time force_encoding start_from_beginning max_bytes_per_partition backoff delivery session_timeout offset_commit_interval offset_commit_threshold heartbeat_interval offset_retention_time ).freeze
Constants included from Instrumentation
Instance Method Summary collapse
-
#initialize ⇒ Executor
constructor
A new instance of Executor.
- #start ⇒ Object
- #stop ⇒ Object
Methods included from Instrumentation
#instrument, subscribe, unsubscribe
Constructor Details
#initialize ⇒ Executor
Returns a new instance of Executor.
22 23 24 25 26 27 28 29 30 31 32 33 |
# File 'lib/phobos/executor.rb', line 22 def initialize @threads = Concurrent::Array.new @listeners = Phobos.config.listeners.flat_map do |config| handler_class = config.handler.constantize listener_configs = config.to_hash.deep_symbolize_keys max_concurrency = listener_configs[:max_concurrency] || 1 Array.new(max_concurrency).map do configs = listener_configs.select { |k| LISTENER_OPTS.include?(k) } Phobos::Listener.new(configs.merge(handler: handler_class)) end end end |
Instance Method Details
#start ⇒ Object
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
# File 'lib/phobos/executor.rb', line 35 def start @signal_to_stop = false @threads.clear @thread_pool = Concurrent::FixedThreadPool.new(@listeners.size) @listeners.each do |listener| @thread_pool.post do thread = Thread.current thread.abort_on_exception = true @threads << thread run_listener(listener) end end true end |
#stop ⇒ Object
52 53 54 55 56 57 58 59 60 61 62 |
# File 'lib/phobos/executor.rb', line 52 def stop return if @signal_to_stop instrument('executor.stop') do @signal_to_stop = true @listeners.each(&:stop) @threads.select(&:alive?).each { |thread| thread.wakeup rescue nil } @thread_pool&.shutdown @thread_pool&.wait_for_termination Phobos.logger.info { Hash(message: 'Executor stopped') } end end |