Class: Fractor::Supervisor
- Inherits:
-
Object
- Object
- Fractor::Supervisor
- Defined in:
- lib/fractor/supervisor.rb
Overview
Supervises multiple WrappedRactors, distributes work, and aggregates results.
Instance Attribute Summary collapse
-
#results ⇒ Object
readonly
Returns the value of attribute results.
-
#work_queue ⇒ Object
readonly
Returns the value of attribute work_queue.
-
#worker_pools ⇒ Object
readonly
Returns the value of attribute worker_pools.
-
#workers ⇒ Object
readonly
Returns the value of attribute workers.
Instance Method Summary collapse
-
#add_work_item(work) ⇒ Object
Adds a single work item to the queue.
-
#add_work_items(works) ⇒ Object
Adds multiple work items to the queue.
-
#handle_shutdown(signal_name) ⇒ Object
Handles shutdown signal by mode (continuous vs batch).
-
#initialize(worker_pools: [], continuous_mode: false) ⇒ Supervisor
constructor
Initializes the Supervisor.
-
#print_status ⇒ Object
Prints current supervisor status.
-
#register_work_source(&callback) ⇒ Object
Register a callback to provide new work items The callback should return nil or empty array when no new work is available.
-
#run ⇒ Object
Runs the main processing loop.
-
#setup_signal_handler ⇒ Object
Sets up signal handlers for graceful shutdown.
-
#setup_status_signal ⇒ Object
Sets up platform-specific status monitoring signal.
-
#start_workers ⇒ Object
Starts the worker Ractors for all worker pools.
-
#stop ⇒ Object
Stop the supervisor (for continuous mode).
Constructor Details
#initialize(worker_pools: [], continuous_mode: false) ⇒ Supervisor
Initializes the Supervisor.
-
worker_pools: An array of worker pool configurations, each containing:
-
worker_class: The class inheriting from Fractor::Worker (e.g., MyWorker).
-
num_workers: The number of Ractors to spawn for this worker class.
-
-
continuous_mode: Whether to run in continuous mode without expecting a fixed work count.
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
# File 'lib/fractor/supervisor.rb', line 19 def initialize(worker_pools: [], continuous_mode: false) @worker_pools = worker_pools.map do |pool_config| worker_class = pool_config[:worker_class] num_workers = pool_config[:num_workers] || detect_num_workers unless worker_class < Fractor::Worker raise ArgumentError, "#{worker_class} must inherit from Fractor::Worker" end { worker_class: worker_class, num_workers: num_workers, workers: [], # Will hold the WrappedRactor instances } end @work_queue = Queue.new @results = ResultAggregator.new @workers = [] # Flattened array of all workers across all pools @total_work_count = 0 # Track total items initially added @ractors_map = {} # Map Ractor object to WrappedRactor instance @continuous_mode = continuous_mode @running = false @work_callbacks = [] @wakeup_ractor = nil # Control ractor for unblocking select @timer_thread = nil # Timer thread for periodic wakeup @idle_workers = [] # Track workers waiting for work end |
Instance Attribute Details
#results ⇒ Object (readonly)
Returns the value of attribute results.
12 13 14 |
# File 'lib/fractor/supervisor.rb', line 12 def results @results end |
#work_queue ⇒ Object (readonly)
Returns the value of attribute work_queue.
12 13 14 |
# File 'lib/fractor/supervisor.rb', line 12 def work_queue @work_queue end |
#worker_pools ⇒ Object (readonly)
Returns the value of attribute worker_pools.
12 13 14 |
# File 'lib/fractor/supervisor.rb', line 12 def worker_pools @worker_pools end |
#workers ⇒ Object (readonly)
Returns the value of attribute workers.
12 13 14 |
# File 'lib/fractor/supervisor.rb', line 12 def workers @workers end |
Instance Method Details
#add_work_item(work) ⇒ Object
Adds a single work item to the queue. The item must be an instance of Fractor::Work or a subclass.
51 52 53 54 55 56 57 58 59 60 61 62 |
# File 'lib/fractor/supervisor.rb', line 51 def add_work_item(work) unless work.is_a?(Fractor::Work) raise ArgumentError, "#{work.class} must be an instance of Fractor::Work" end @work_queue << work @total_work_count += 1 return unless ENV["FRACTOR_DEBUG"] puts "Work item added. Initial work count: #{@total_work_count}, Queue size: #{@work_queue.size}" end |
#add_work_items(works) ⇒ Object
Adds multiple work items to the queue. Each item must be an instance of Fractor::Work or a subclass.
66 67 68 69 70 |
# File 'lib/fractor/supervisor.rb', line 66 def add_work_items(works) works.each do |work| add_work_item(work) end end |
#handle_shutdown(signal_name) ⇒ Object
Handles shutdown signal by mode (continuous vs batch)
130 131 132 133 134 135 136 137 138 139 140 141 142 |
# File 'lib/fractor/supervisor.rb', line 130 def handle_shutdown(signal_name) if @continuous_mode puts "\n#{signal_name} received. Initiating graceful shutdown..." if ENV["FRACTOR_DEBUG"] stop else puts "\n#{signal_name} received. Initiating immediate shutdown..." if ENV["FRACTOR_DEBUG"] Thread.current.raise(ShutdownSignal, "Interrupted by #{signal_name}") end rescue Exception => e puts "Error in signal handler: #{e.class}: #{e.}" if ENV["FRACTOR_DEBUG"] puts e.backtrace.join("\n") if ENV["FRACTOR_DEBUG"] exit!(1) end |
#print_status ⇒ Object
Prints current supervisor status
165 166 167 168 169 170 171 172 173 174 175 |
# File 'lib/fractor/supervisor.rb', line 165 def print_status puts "\n=== Fractor Supervisor Status ===" puts "Mode: #{@continuous_mode ? 'Continuous' : 'Batch'}" puts "Running: #{@running}" puts "Workers: #{@workers.size}" puts "Idle workers: #{@idle_workers.size}" puts "Queue size: #{@work_queue.size}" puts "Results: #{@results.results.size}" puts "Errors: #{@results.errors.size}" puts "================================\n" end |
#register_work_source(&callback) ⇒ Object
Register a callback to provide new work items The callback should return nil or empty array when no new work is available
74 75 76 |
# File 'lib/fractor/supervisor.rb', line 74 def register_work_source(&callback) @work_callbacks << callback end |
#run ⇒ Object
Runs the main processing loop.
178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 |
# File 'lib/fractor/supervisor.rb', line 178 def run setup_signal_handler start_workers @running = true processed_count = 0 # Start timer thread for continuous mode to periodically check work sources if @continuous_mode && !@work_callbacks.empty? @timer_thread = Thread.new do while @running sleep(0.1) # Check work sources every 100ms if @wakeup_ractor && @running begin @wakeup_ractor.send(:wakeup) rescue StandardError => e puts "Timer thread error sending wakeup: #{e.}" if ENV["FRACTOR_DEBUG"] break end end end puts "Timer thread shutting down" if ENV["FRACTOR_DEBUG"] end end begin # Main loop: Process events until conditions are met for termination while @running && (@continuous_mode || processed_count < @total_work_count) processed_count = @results.results.size + @results.errors.size if ENV["FRACTOR_DEBUG"] if @continuous_mode puts "Continuous mode: Waiting for Ractor results. Processed: #{processed_count}, Queue size: #{@work_queue.size}" else puts "Waiting for Ractor results. Processed: #{processed_count}/#{@total_work_count}, Queue size: #{@work_queue.size}" end end # Get active Ractor objects from the map keys active_ractors = @ractors_map.keys # Check for new work from callbacks if in continuous mode if @continuous_mode && !@work_callbacks.empty? @work_callbacks.each do |callback| new_work = callback.call if new_work && !new_work.empty? add_work_items(new_work) puts "Work source provided #{new_work.size} new items" if ENV["FRACTOR_DEBUG"] # Try to send work to idle workers first while !@work_queue.empty? && !@idle_workers.empty? worker = @idle_workers.shift if send_next_work_if_available(worker) puts "Sent work to idle worker #{worker.name}" if ENV["FRACTOR_DEBUG"] else # Worker couldn't accept work, don't re-add to idle list end end end end end # Break if no active workers and queue is empty, but work remains (indicates potential issue) if active_ractors.empty? && @work_queue.empty? && !@continuous_mode && processed_count < @total_work_count puts "Warning: No active workers and queue is empty, but not all work is processed. Exiting loop." if ENV["FRACTOR_DEBUG"] break end # In continuous mode, just wait if no active ractors but keep running if active_ractors.empty? break unless @continuous_mode sleep(0.1) # Small delay to avoid CPU spinning next end # Ractor.select blocks until a message is available from any active Ractor # The wakeup ractor ensures we can unblock this call when needed ready_ractor_obj, = Ractor.select(*active_ractors) # Check if this is the wakeup ractor if ready_ractor_obj == @wakeup_ractor puts "Wakeup signal received: #{[:message]}" if ENV["FRACTOR_DEBUG"] # Remove wakeup ractor from map if shutting down if [:message] == :shutdown @ractors_map.delete(@wakeup_ractor) @wakeup_ractor = nil end # Continue loop to check @running flag next end # Find the corresponding WrappedRactor instance wrapped_ractor = @ractors_map[ready_ractor_obj] unless wrapped_ractor puts "Warning: Received message from unknown Ractor: #{ready_ractor_obj}. Ignoring." if ENV["FRACTOR_DEBUG"] next end puts "Selected Ractor: #{wrapped_ractor.name}, Message Type: #{[:type]}" if ENV["FRACTOR_DEBUG"] # Process the received message case [:type] when :initialize puts "Ractor initialized: #{[:processor]}" if ENV["FRACTOR_DEBUG"] # Send work immediately upon initialization if available if send_next_work_if_available(wrapped_ractor) # Work was sent else # No work available, mark worker as idle @idle_workers << wrapped_ractor unless @idle_workers.include?(wrapped_ractor) puts "Worker #{wrapped_ractor.name} marked as idle" if ENV["FRACTOR_DEBUG"] end when :shutdown puts "Ractor #{wrapped_ractor.name} acknowledged shutdown" if ENV["FRACTOR_DEBUG"] # Remove from active ractors @ractors_map.delete(ready_ractor_obj) when :result # The message[:result] should be a WorkResult object work_result = [:result] puts "Completed work: #{work_result.inspect} in Ractor: #{[:processor]}" if ENV["FRACTOR_DEBUG"] @results.add_result(work_result) if ENV["FRACTOR_DEBUG"] puts "Result processed. Total processed: #{@results.results.size + @results.errors.size}" puts "Aggregated Results: #{@results.inspect}" unless @continuous_mode end # Send next piece of work if send_next_work_if_available(wrapped_ractor) # Work was sent else # No work available, mark worker as idle @idle_workers << wrapped_ractor unless @idle_workers.include?(wrapped_ractor) puts "Worker #{wrapped_ractor.name} marked as idle after completing work" if ENV["FRACTOR_DEBUG"] end when :error # The message[:result] should be a WorkResult object containing the error error_result = [:result] puts "Error processing work #{error_result.work&.inspect} in Ractor: #{[:processor]}: #{error_result.error}" if ENV["FRACTOR_DEBUG"] @results.add_result(error_result) # Add error to aggregator if ENV["FRACTOR_DEBUG"] puts "Error handled. Total processed: #{@results.results.size + @results.errors.size}" puts "Aggregated Results (including errors): #{@results.inspect}" unless @continuous_mode end # Send next piece of work even after an error if send_next_work_if_available(wrapped_ractor) # Work was sent else # No work available, mark worker as idle @idle_workers << wrapped_ractor unless @idle_workers.include?(wrapped_ractor) puts "Worker #{wrapped_ractor.name} marked as idle after error" if ENV["FRACTOR_DEBUG"] end else puts "Unknown message type received: #{[:type]} from #{wrapped_ractor.name}" if ENV["FRACTOR_DEBUG"] end # Update processed count for the loop condition processed_count = @results.results.size + @results.errors.size end puts "Main loop finished." if ENV["FRACTOR_DEBUG"] rescue ShutdownSignal => e puts "Shutdown signal caught: #{e.}" if ENV["FRACTOR_DEBUG"] puts "Sending shutdown message to all Ractors..." if ENV["FRACTOR_DEBUG"] # Send shutdown message to each worker Ractor @workers.each do |w| w.send(:shutdown) puts "Sent shutdown to Ractor: #{w.name}" if ENV["FRACTOR_DEBUG"] rescue StandardError => send_error puts "Error sending shutdown to Ractor #{w.name}: #{send_error.}" if ENV["FRACTOR_DEBUG"] end puts "Exiting due to shutdown signal." if ENV["FRACTOR_DEBUG"] exit!(1) # Force exit immediately end return if @continuous_mode return unless ENV["FRACTOR_DEBUG"] puts "Final Aggregated Results: #{@results.inspect}" end |
#setup_signal_handler ⇒ Object
Sets up signal handlers for graceful shutdown. Handles SIGINT (Ctrl+C), SIGTERM (systemd/docker), and platform-specific status signals.
120 121 122 123 124 125 126 127 |
# File 'lib/fractor/supervisor.rb', line 120 def setup_signal_handler # Universal signals (work on all platforms) Signal.trap("INT") { handle_shutdown("SIGINT") } Signal.trap("TERM") { handle_shutdown("SIGTERM") } # Platform-specific status monitoring setup_status_signal end |
#setup_status_signal ⇒ Object
Sets up platform-specific status monitoring signal
145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 |
# File 'lib/fractor/supervisor.rb', line 145 def setup_status_signal if Gem.win_platform? # Windows: Try SIGBREAK (Ctrl+Break) if available begin Signal.trap("BREAK") { print_status } rescue ArgumentError # SIGBREAK not supported on this Ruby version/platform # Status monitoring unavailable on Windows end else # Unix/Linux/macOS: Use SIGUSR1 begin Signal.trap("USR1") { print_status } rescue ArgumentError # SIGUSR1 not supported on this platform end end end |
#start_workers ⇒ Object
Starts the worker Ractors for all worker pools.
79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 |
# File 'lib/fractor/supervisor.rb', line 79 def start_workers # Create a wakeup Ractor for unblocking Ractor.select @wakeup_ractor = Ractor.new do puts "Wakeup Ractor started" if ENV["FRACTOR_DEBUG"] loop do msg = Ractor.receive puts "Wakeup Ractor received: #{msg.inspect}" if ENV["FRACTOR_DEBUG"] if %i[wakeup shutdown].include?(msg) Ractor.yield({ type: :wakeup, message: msg }) break if msg == :shutdown end end puts "Wakeup Ractor shutting down" if ENV["FRACTOR_DEBUG"] end # Add wakeup ractor to the map with a special marker @ractors_map[@wakeup_ractor] = :wakeup @worker_pools.each do |pool| worker_class = pool[:worker_class] num_workers = pool[:num_workers] pool[:workers] = (1..num_workers).map do |i| wrapped_ractor = WrappedRactor.new("worker #{worker_class}:#{i}", worker_class) wrapped_ractor.start # Start the underlying Ractor # Map the actual Ractor object to the WrappedRactor instance @ractors_map[wrapped_ractor.ractor] = wrapped_ractor if wrapped_ractor.ractor wrapped_ractor end.compact end # Flatten all workers for easier access @workers = @worker_pools.flat_map { |pool| pool[:workers] } @ractors_map.compact! # Ensure map doesn't contain nil keys/values return unless ENV["FRACTOR_DEBUG"] puts "Workers started: #{@workers.size} active across #{@worker_pools.size} pools." end |
#stop ⇒ Object
Stop the supervisor (for continuous mode)
361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 |
# File 'lib/fractor/supervisor.rb', line 361 def stop @running = false puts "Stopping supervisor..." if ENV["FRACTOR_DEBUG"] # Wait for timer thread to finish if it exists if @timer_thread&.alive? @timer_thread.join(1) # Wait up to 1 second puts "Timer thread stopped" if ENV["FRACTOR_DEBUG"] end # Signal the wakeup ractor first to unblock Ractor.select if @wakeup_ractor begin @wakeup_ractor.send(:shutdown) puts "Sent shutdown signal to wakeup ractor" if ENV["FRACTOR_DEBUG"] rescue StandardError => e puts "Error sending shutdown to wakeup ractor: #{e.}" if ENV["FRACTOR_DEBUG"] end end # Send shutdown signal to all workers @workers.each do |w| begin w.send(:shutdown) rescue StandardError nil end puts "Sent shutdown signal to #{w.name}" if ENV["FRACTOR_DEBUG"] end end |