Class: Botiasloop::ChannelsManager

Inherits:
Object
  • Object
show all
Defined in:
lib/botiasloop/channels_manager.rb

Overview

Manages concurrent execution of multiple channels

ChannelsManager provides a higher-level abstraction over the Channel Registry, handling the threading and lifecycle management required to run multiple channels simultaneously. Each channel runs in its own thread, allowing independent operation and error isolation.

Examples:

Basic usage

manager = Botiasloop::ChannelsManager.new
manager.start_listening.wait

Constant Summary collapse

SHUTDOWN_TIMEOUT =

Time to wait for graceful shutdown before force-killing threads

5
EXCLUDED_CHANNELS =

Channels that should not be auto-started (interactive channels)

%i[cli].freeze

Instance Method Summary collapse

Constructor Details

#initializeChannelsManager

Initialize a new ChannelsManager



25
26
27
28
29
30
31
# File 'lib/botiasloop/channels_manager.rb', line 25

def initialize
  @threads = {}
  @instances = {}
  @mutex = Mutex.new
  @running = false
  @shutdown_requested = false
end

Instance Method Details

#all_statusesHash{Symbol => Hash}

Get status for all running channels

Returns:

  • (Hash{Symbol => Hash})

    Map of channel identifier to status



177
178
179
180
181
182
183
184
185
186
187
188
189
# File 'lib/botiasloop/channels_manager.rb', line 177

def all_statuses
  @mutex.synchronize do
    @instances.transform_values do |instance|
      thread = @threads[instance.class.channel_identifier]
      {
        identifier: instance.class.channel_identifier,
        running: instance.running?,
        thread_alive: thread&.alive? || false,
        thread_id: thread&.object_id
      }
    end
  end
end

#channel_status(identifier) ⇒ Hash?

Get status information for a specific channel

Parameters:

  • identifier (Symbol)

    Channel identifier

Returns:

  • (Hash, nil)

    Status hash or nil if channel not found

    • :identifier [Symbol] Channel identifier

    • :running [Boolean] Whether channel instance reports running

    • :thread_alive [Boolean] Whether thread is alive

    • :thread_id [Integer] Thread object ID



158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
# File 'lib/botiasloop/channels_manager.rb', line 158

def channel_status(identifier)
  @mutex.synchronize do
    instance = @instances[identifier]
    thread = @threads[identifier]

    return nil unless instance

    {
      identifier: identifier,
      running: instance.running?,
      thread_alive: thread&.alive? || false,
      thread_id: thread&.object_id
    }
  end
end

#instance(identifier) ⇒ Base?

Get a specific channel instance

Parameters:

  • identifier (Symbol)

    Channel identifier

Returns:

  • (Base, nil)

    Channel instance or nil if not running



144
145
146
147
148
# File 'lib/botiasloop/channels_manager.rb', line 144

def instance(identifier)
  @mutex.synchronize do
    @instances[identifier]
  end
end

#running?Boolean

Check if any channels are currently running

Returns:

  • (Boolean)

    True if any channel thread is alive



123
124
125
126
127
128
129
# File 'lib/botiasloop/channels_manager.rb', line 123

def running?
  @mutex.synchronize do
    return false unless @running

    @threads.any? { |_, thread| thread.alive? }
  end
end

#start_listeningChannelsManager

Start all configured channels in separate threads

Each channel is spawned in its own thread, allowing concurrent operation. Channels with missing configuration are skipped with a warning. Startup failures are logged but don’t prevent other channels from starting.

Returns:

Raises:

  • (Error)

    If channels are already running



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/botiasloop/channels_manager.rb', line 42

def start_listening
  @mutex.synchronize do
    raise Error, "Channels are already running" if @running

    @running = true
    @shutdown_requested = false
  end

  setup_signal_handlers

  registry = Channels.registry
  registry.channels.each do |identifier, channel_class|
    next if EXCLUDED_CHANNELS.include?(identifier)

    begin
      instance = channel_class.new
    rescue Error => e
      if e.message.match?(/Missing required configuration/)
        Logger.warn "[ChannelsManager] Skipping #{identifier}: #{e.message}"
        next
      end
      raise
    end

    thread = spawn_channel_thread(identifier, instance)
    @mutex.synchronize do
      @threads[identifier] = thread
      @instances[identifier] = instance
    end

    Logger.info "[ChannelsManager] Started #{identifier} in thread #{thread.object_id}"
  end

  # Monitor threads for crashes
  spawn_monitor_thread unless @threads.empty?

  self
end

#stop_listeningvoid

This method returns an undefined value.

Stop all running channels gracefully

Sends stop signal to all channel instances and waits for threads to complete. Force-kills threads that don’t stop within timeout.



87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# File 'lib/botiasloop/channels_manager.rb', line 87

def stop_listening
  @mutex.synchronize do
    return unless @running

    @shutdown_requested = true
    @running = false
  end

  Logger.info "[ChannelsManager] Stopping all channels..."

  # Stop all channel instances
  @instances.each do |identifier, instance|
    instance.stop_listening if instance.running?
  rescue => e
    Logger.error "[ChannelsManager] Error stopping #{identifier}: #{e.message}"
  end

  # Wait for threads to complete
  @threads.each do |identifier, thread|
    unless thread.join(SHUTDOWN_TIMEOUT)
      Logger.warn "[ChannelsManager] Force-killing #{identifier} thread"
      thread.kill
    end
  end

  @mutex.synchronize do
    @threads.clear
    @instances.clear
  end

  Logger.info "[ChannelsManager] All channels stopped"
end

#thread_countInteger

Get the number of active channel threads

Returns:

  • (Integer)

    Count of alive threads



134
135
136
137
138
# File 'lib/botiasloop/channels_manager.rb', line 134

def thread_count
  @mutex.synchronize do
    @threads.count { |_, thread| thread.alive? }
  end
end

#waitvoid

This method returns an undefined value.

Block until all channels have stopped

Useful for daemon mode where the main thread should wait. Returns immediately if no channels are running.



197
198
199
200
201
202
203
204
205
# File 'lib/botiasloop/channels_manager.rb', line 197

def wait
  return unless running?

  # Wait for all threads to complete
  loop do
    sleep 0.1
    break unless running?
  end
end