Class: Botiasloop::ChannelsManager
- Inherits:
-
Object
- Object
- Botiasloop::ChannelsManager
- Defined in:
- lib/botiasloop/channels_manager.rb
Overview
Manages concurrent execution of multiple channels
ChannelsManager provides a higher-level abstraction over the Channel Registry, handling the threading and lifecycle management required to run multiple channels simultaneously. Each channel runs in its own thread, allowing independent operation and error isolation.
Constant Summary collapse
- SHUTDOWN_TIMEOUT =
Time to wait for graceful shutdown before force-killing threads
5- EXCLUDED_CHANNELS =
Channels that should not be auto-started (interactive channels)
%i[cli].freeze
Instance Method Summary collapse
-
#all_statuses ⇒ Hash{Symbol => Hash}
Get status for all running channels.
-
#channel_status(identifier) ⇒ Hash?
Get status information for a specific channel.
-
#initialize ⇒ ChannelsManager
constructor
Initialize a new ChannelsManager.
-
#instance(identifier) ⇒ Base?
Get a specific channel instance.
-
#running? ⇒ Boolean
Check if any channels are currently running.
-
#start_listening ⇒ ChannelsManager
Start all configured channels in separate threads.
-
#stop_listening ⇒ void
Stop all running channels gracefully.
-
#thread_count ⇒ Integer
Get the number of active channel threads.
-
#wait ⇒ void
Block until all channels have stopped.
Constructor Details
#initialize ⇒ ChannelsManager
Initialize a new ChannelsManager
25 26 27 28 29 30 31 |
# File 'lib/botiasloop/channels_manager.rb', line 25 def initialize @threads = {} @instances = {} @mutex = Mutex.new @running = false @shutdown_requested = false end |
Instance Method Details
#all_statuses ⇒ Hash{Symbol => Hash}
Get status for all running channels
177 178 179 180 181 182 183 184 185 186 187 188 189 |
# File 'lib/botiasloop/channels_manager.rb', line 177 def all_statuses @mutex.synchronize do @instances.transform_values do |instance| thread = @threads[instance.class.channel_identifier] { identifier: instance.class.channel_identifier, running: instance.running?, thread_alive: thread&.alive? || false, thread_id: thread&.object_id } end end end |
#channel_status(identifier) ⇒ Hash?
Get status information for a specific channel
158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 |
# File 'lib/botiasloop/channels_manager.rb', line 158 def channel_status(identifier) @mutex.synchronize do instance = @instances[identifier] thread = @threads[identifier] return nil unless instance { identifier: identifier, running: instance.running?, thread_alive: thread&.alive? || false, thread_id: thread&.object_id } end end |
#instance(identifier) ⇒ Base?
Get a specific channel instance
144 145 146 147 148 |
# File 'lib/botiasloop/channels_manager.rb', line 144 def instance(identifier) @mutex.synchronize do @instances[identifier] end end |
#running? ⇒ Boolean
Check if any channels are currently running
123 124 125 126 127 128 129 |
# File 'lib/botiasloop/channels_manager.rb', line 123 def running? @mutex.synchronize do return false unless @running @threads.any? { |_, thread| thread.alive? } end end |
#start_listening ⇒ ChannelsManager
Start all configured channels in separate threads
Each channel is spawned in its own thread, allowing concurrent operation. Channels with missing configuration are skipped with a warning. Startup failures are logged but don’t prevent other channels from starting.
42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 |
# File 'lib/botiasloop/channels_manager.rb', line 42 def start_listening @mutex.synchronize do raise Error, "Channels are already running" if @running @running = true @shutdown_requested = false end setup_signal_handlers registry = Channels.registry registry.channels.each do |identifier, channel_class| next if EXCLUDED_CHANNELS.include?(identifier) begin instance = channel_class.new rescue Error => e if e..match?(/Missing required configuration/) Logger.warn "[ChannelsManager] Skipping #{identifier}: #{e.}" next end raise end thread = spawn_channel_thread(identifier, instance) @mutex.synchronize do @threads[identifier] = thread @instances[identifier] = instance end Logger.info "[ChannelsManager] Started #{identifier} in thread #{thread.object_id}" end # Monitor threads for crashes spawn_monitor_thread unless @threads.empty? self end |
#stop_listening ⇒ void
This method returns an undefined value.
Stop all running channels gracefully
Sends stop signal to all channel instances and waits for threads to complete. Force-kills threads that don’t stop within timeout.
87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 |
# File 'lib/botiasloop/channels_manager.rb', line 87 def stop_listening @mutex.synchronize do return unless @running @shutdown_requested = true @running = false end Logger.info "[ChannelsManager] Stopping all channels..." # Stop all channel instances @instances.each do |identifier, instance| instance.stop_listening if instance.running? rescue => e Logger.error "[ChannelsManager] Error stopping #{identifier}: #{e.}" end # Wait for threads to complete @threads.each do |identifier, thread| unless thread.join(SHUTDOWN_TIMEOUT) Logger.warn "[ChannelsManager] Force-killing #{identifier} thread" thread.kill end end @mutex.synchronize do @threads.clear @instances.clear end Logger.info "[ChannelsManager] All channels stopped" end |
#thread_count ⇒ Integer
Get the number of active channel threads
134 135 136 137 138 |
# File 'lib/botiasloop/channels_manager.rb', line 134 def thread_count @mutex.synchronize do @threads.count { |_, thread| thread.alive? } end end |
#wait ⇒ void
This method returns an undefined value.
Block until all channels have stopped
Useful for daemon mode where the main thread should wait. Returns immediately if no channels are running.
197 198 199 200 201 202 203 204 205 |
# File 'lib/botiasloop/channels_manager.rb', line 197 def wait return unless running? # Wait for all threads to complete loop do sleep 0.1 break unless running? end end |