Class: Fractor::Supervisor

Inherits:
Object
  • Object
show all
Defined in:
lib/fractor/supervisor.rb

Overview

Supervises multiple WrappedRactors, distributes work, and aggregates results.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(worker_pools: [], continuous_mode: false) ⇒ Supervisor

Initializes the Supervisor.

  • worker_pools: An array of worker pool configurations, each containing:

    • worker_class: The class inheriting from Fractor::Worker (e.g., MyWorker).

    • num_workers: The number of Ractors to spawn for this worker class.

  • continuous_mode: Whether to run in continuous mode without expecting a fixed work count.



19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/fractor/supervisor.rb', line 19

def initialize(worker_pools: [], continuous_mode: false)
  @worker_pools = worker_pools.map do |pool_config|
    worker_class = pool_config[:worker_class]
    num_workers = pool_config[:num_workers] || detect_num_workers

    unless worker_class < Fractor::Worker
      raise ArgumentError,
            "#{worker_class} must inherit from Fractor::Worker"
    end

    {
      worker_class: worker_class,
      num_workers: num_workers,
      workers: [], # Will hold the WrappedRactor instances
    }
  end

  @work_queue = Queue.new
  @results = ResultAggregator.new
  @workers = [] # Flattened array of all workers across all pools
  @total_work_count = 0 # Track total items initially added
  @ractors_map = {} # Map Ractor object to WrappedRactor instance
  @continuous_mode = continuous_mode
  @running = false
  @work_callbacks = []
  @wakeup_ractor = nil # Control ractor for unblocking select
  @timer_thread = nil # Timer thread for periodic wakeup
  @idle_workers = [] # Track workers waiting for work
end

Instance Attribute Details

#resultsObject (readonly)

Returns the value of attribute results.



12
13
14
# File 'lib/fractor/supervisor.rb', line 12

def results
  @results
end

#work_queueObject (readonly)

Returns the value of attribute work_queue.



12
13
14
# File 'lib/fractor/supervisor.rb', line 12

def work_queue
  @work_queue
end

#worker_poolsObject (readonly)

Returns the value of attribute worker_pools.



12
13
14
# File 'lib/fractor/supervisor.rb', line 12

def worker_pools
  @worker_pools
end

#workersObject (readonly)

Returns the value of attribute workers.



12
13
14
# File 'lib/fractor/supervisor.rb', line 12

def workers
  @workers
end

Instance Method Details

#add_work_item(work) ⇒ Object

Adds a single work item to the queue. The item must be an instance of Fractor::Work or a subclass.



51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/fractor/supervisor.rb', line 51

def add_work_item(work)
  unless work.is_a?(Fractor::Work)
    raise ArgumentError,
          "#{work.class} must be an instance of Fractor::Work"
  end

  @work_queue << work
  @total_work_count += 1
  return unless ENV["FRACTOR_DEBUG"]

  puts "Work item added. Initial work count: #{@total_work_count}, Queue size: #{@work_queue.size}"
end

#add_work_items(works) ⇒ Object

Adds multiple work items to the queue. Each item must be an instance of Fractor::Work or a subclass.



66
67
68
69
70
# File 'lib/fractor/supervisor.rb', line 66

def add_work_items(works)
  works.each do |work|
    add_work_item(work)
  end
end

#handle_shutdown(signal_name) ⇒ Object

Handles shutdown signal by mode (continuous vs batch)



130
131
132
133
134
135
136
137
138
139
140
141
142
# File 'lib/fractor/supervisor.rb', line 130

def handle_shutdown(signal_name)
  if @continuous_mode
    puts "\n#{signal_name} received. Initiating graceful shutdown..." if ENV["FRACTOR_DEBUG"]
    stop
  else
    puts "\n#{signal_name} received. Initiating immediate shutdown..." if ENV["FRACTOR_DEBUG"]
    Thread.current.raise(ShutdownSignal, "Interrupted by #{signal_name}")
  end
rescue Exception => e
  puts "Error in signal handler: #{e.class}: #{e.message}" if ENV["FRACTOR_DEBUG"]
  puts e.backtrace.join("\n") if ENV["FRACTOR_DEBUG"]
  exit!(1)
end

Prints current supervisor status



165
166
167
168
169
170
171
172
173
174
175
# File 'lib/fractor/supervisor.rb', line 165

def print_status
  puts "\n=== Fractor Supervisor Status ==="
  puts "Mode: #{@continuous_mode ? 'Continuous' : 'Batch'}"
  puts "Running: #{@running}"
  puts "Workers: #{@workers.size}"
  puts "Idle workers: #{@idle_workers.size}"
  puts "Queue size: #{@work_queue.size}"
  puts "Results: #{@results.results.size}"
  puts "Errors: #{@results.errors.size}"
  puts "================================\n"
end

#register_work_source(&callback) ⇒ Object

Register a callback to provide new work items The callback should return nil or empty array when no new work is available



74
75
76
# File 'lib/fractor/supervisor.rb', line 74

def register_work_source(&callback)
  @work_callbacks << callback
end

#runObject

Runs the main processing loop.



178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
# File 'lib/fractor/supervisor.rb', line 178

def run
  setup_signal_handler
  start_workers

  @running = true
  processed_count = 0

  # Start timer thread for continuous mode to periodically check work sources
  if @continuous_mode && !@work_callbacks.empty?
    @timer_thread = Thread.new do
      while @running
        sleep(0.1) # Check work sources every 100ms
        if @wakeup_ractor && @running
          begin
            @wakeup_ractor.send(:wakeup)
          rescue StandardError => e
            puts "Timer thread error sending wakeup: #{e.message}" if ENV["FRACTOR_DEBUG"]
            break
          end
        end
      end
      puts "Timer thread shutting down" if ENV["FRACTOR_DEBUG"]
    end
  end

  begin
    # Main loop: Process events until conditions are met for termination
    while @running && (@continuous_mode || processed_count < @total_work_count)
      processed_count = @results.results.size + @results.errors.size

      if ENV["FRACTOR_DEBUG"]
        if @continuous_mode
          puts "Continuous mode: Waiting for Ractor results. Processed: #{processed_count}, Queue size: #{@work_queue.size}"
        else
          puts "Waiting for Ractor results. Processed: #{processed_count}/#{@total_work_count}, Queue size: #{@work_queue.size}"
        end
      end

      # Get active Ractor objects from the map keys
      active_ractors = @ractors_map.keys

      # Check for new work from callbacks if in continuous mode
      if @continuous_mode && !@work_callbacks.empty?
        @work_callbacks.each do |callback|
          new_work = callback.call
          if new_work && !new_work.empty?
            add_work_items(new_work)
            puts "Work source provided #{new_work.size} new items" if ENV["FRACTOR_DEBUG"]

            # Try to send work to idle workers first
            while !@work_queue.empty? && !@idle_workers.empty?
              worker = @idle_workers.shift
              if send_next_work_if_available(worker)
                puts "Sent work to idle worker #{worker.name}" if ENV["FRACTOR_DEBUG"]
              else
                # Worker couldn't accept work, don't re-add to idle list
              end
            end
          end
        end
      end

      # Break if no active workers and queue is empty, but work remains (indicates potential issue)
      if active_ractors.empty? && @work_queue.empty? && !@continuous_mode && processed_count < @total_work_count
        puts "Warning: No active workers and queue is empty, but not all work is processed. Exiting loop." if ENV["FRACTOR_DEBUG"]
        break
      end

      # In continuous mode, just wait if no active ractors but keep running
      if active_ractors.empty?
        break unless @continuous_mode

        sleep(0.1) # Small delay to avoid CPU spinning
        next
      end

      # Ractor.select blocks until a message is available from any active Ractor
      # The wakeup ractor ensures we can unblock this call when needed
      ready_ractor_obj, message = Ractor.select(*active_ractors)

      # Check if this is the wakeup ractor
      if ready_ractor_obj == @wakeup_ractor
        puts "Wakeup signal received: #{message[:message]}" if ENV["FRACTOR_DEBUG"]
        # Remove wakeup ractor from map if shutting down
        if message[:message] == :shutdown
          @ractors_map.delete(@wakeup_ractor)
          @wakeup_ractor = nil
        end
        # Continue loop to check @running flag
        next
      end

      # Find the corresponding WrappedRactor instance
      wrapped_ractor = @ractors_map[ready_ractor_obj]
      unless wrapped_ractor
        puts "Warning: Received message from unknown Ractor: #{ready_ractor_obj}. Ignoring." if ENV["FRACTOR_DEBUG"]
        next
      end

      puts "Selected Ractor: #{wrapped_ractor.name}, Message Type: #{message[:type]}" if ENV["FRACTOR_DEBUG"]

      # Process the received message
      case message[:type]
      when :initialize
        puts "Ractor initialized: #{message[:processor]}" if ENV["FRACTOR_DEBUG"]
        # Send work immediately upon initialization if available
        if send_next_work_if_available(wrapped_ractor)
          # Work was sent
        else
          # No work available, mark worker as idle
          @idle_workers << wrapped_ractor unless @idle_workers.include?(wrapped_ractor)
          puts "Worker #{wrapped_ractor.name} marked as idle" if ENV["FRACTOR_DEBUG"]
        end
      when :shutdown
        puts "Ractor #{wrapped_ractor.name} acknowledged shutdown" if ENV["FRACTOR_DEBUG"]
        # Remove from active ractors
        @ractors_map.delete(ready_ractor_obj)
      when :result
        # The message[:result] should be a WorkResult object
        work_result = message[:result]
        puts "Completed work: #{work_result.inspect} in Ractor: #{message[:processor]}" if ENV["FRACTOR_DEBUG"]
        @results.add_result(work_result)
        if ENV["FRACTOR_DEBUG"]
          puts "Result processed. Total processed: #{@results.results.size + @results.errors.size}"
          puts "Aggregated Results: #{@results.inspect}" unless @continuous_mode
        end
        # Send next piece of work
        if send_next_work_if_available(wrapped_ractor)
          # Work was sent
        else
          # No work available, mark worker as idle
          @idle_workers << wrapped_ractor unless @idle_workers.include?(wrapped_ractor)
          puts "Worker #{wrapped_ractor.name} marked as idle after completing work" if ENV["FRACTOR_DEBUG"]
        end
      when :error
        # The message[:result] should be a WorkResult object containing the error
        error_result = message[:result]
        puts "Error processing work #{error_result.work&.inspect} in Ractor: #{message[:processor]}: #{error_result.error}" if ENV["FRACTOR_DEBUG"]
        @results.add_result(error_result) # Add error to aggregator
        if ENV["FRACTOR_DEBUG"]
          puts "Error handled. Total processed: #{@results.results.size + @results.errors.size}"
          puts "Aggregated Results (including errors): #{@results.inspect}" unless @continuous_mode
        end
        # Send next piece of work even after an error
        if send_next_work_if_available(wrapped_ractor)
          # Work was sent
        else
          # No work available, mark worker as idle
          @idle_workers << wrapped_ractor unless @idle_workers.include?(wrapped_ractor)
          puts "Worker #{wrapped_ractor.name} marked as idle after error" if ENV["FRACTOR_DEBUG"]
        end
      else
        puts "Unknown message type received: #{message[:type]} from #{wrapped_ractor.name}" if ENV["FRACTOR_DEBUG"]
      end
      # Update processed count for the loop condition
      processed_count = @results.results.size + @results.errors.size
    end

    puts "Main loop finished." if ENV["FRACTOR_DEBUG"]
  rescue ShutdownSignal => e
    puts "Shutdown signal caught: #{e.message}" if ENV["FRACTOR_DEBUG"]
    puts "Sending shutdown message to all Ractors..." if ENV["FRACTOR_DEBUG"]

    # Send shutdown message to each worker Ractor
    @workers.each do |w|
      w.send(:shutdown)
      puts "Sent shutdown to Ractor: #{w.name}" if ENV["FRACTOR_DEBUG"]
    rescue StandardError => send_error
      puts "Error sending shutdown to Ractor #{w.name}: #{send_error.message}" if ENV["FRACTOR_DEBUG"]
    end

    puts "Exiting due to shutdown signal." if ENV["FRACTOR_DEBUG"]
    exit!(1) # Force exit immediately
  end

  return if @continuous_mode

  return unless ENV["FRACTOR_DEBUG"]

  puts "Final Aggregated Results: #{@results.inspect}"
end

#setup_signal_handlerObject

Sets up signal handlers for graceful shutdown. Handles SIGINT (Ctrl+C), SIGTERM (systemd/docker), and platform-specific status signals.



120
121
122
123
124
125
126
127
# File 'lib/fractor/supervisor.rb', line 120

def setup_signal_handler
  # Universal signals (work on all platforms)
  Signal.trap("INT") { handle_shutdown("SIGINT") }
  Signal.trap("TERM") { handle_shutdown("SIGTERM") }

  # Platform-specific status monitoring
  setup_status_signal
end

#setup_status_signalObject

Sets up platform-specific status monitoring signal



145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
# File 'lib/fractor/supervisor.rb', line 145

def setup_status_signal
  if Gem.win_platform?
    # Windows: Try SIGBREAK (Ctrl+Break) if available
    begin
      Signal.trap("BREAK") { print_status }
    rescue ArgumentError
      # SIGBREAK not supported on this Ruby version/platform
      # Status monitoring unavailable on Windows
    end
  else
    # Unix/Linux/macOS: Use SIGUSR1
    begin
      Signal.trap("USR1") { print_status }
    rescue ArgumentError
      # SIGUSR1 not supported on this platform
    end
  end
end

#start_workersObject

Starts the worker Ractors for all worker pools.



79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
# File 'lib/fractor/supervisor.rb', line 79

def start_workers
  # Create a wakeup Ractor for unblocking Ractor.select
  @wakeup_ractor = Ractor.new do
    puts "Wakeup Ractor started" if ENV["FRACTOR_DEBUG"]
    loop do
      msg = Ractor.receive
      puts "Wakeup Ractor received: #{msg.inspect}" if ENV["FRACTOR_DEBUG"]
      if %i[wakeup shutdown].include?(msg)
        Ractor.yield({ type: :wakeup, message: msg })
        break if msg == :shutdown
      end
    end
    puts "Wakeup Ractor shutting down" if ENV["FRACTOR_DEBUG"]
  end

  # Add wakeup ractor to the map with a special marker
  @ractors_map[@wakeup_ractor] = :wakeup

  @worker_pools.each do |pool|
    worker_class = pool[:worker_class]
    num_workers = pool[:num_workers]

    pool[:workers] = (1..num_workers).map do |i|
      wrapped_ractor = WrappedRactor.new("worker #{worker_class}:#{i}", worker_class)
      wrapped_ractor.start # Start the underlying Ractor
      # Map the actual Ractor object to the WrappedRactor instance
      @ractors_map[wrapped_ractor.ractor] = wrapped_ractor if wrapped_ractor.ractor
      wrapped_ractor
    end.compact
  end

  # Flatten all workers for easier access
  @workers = @worker_pools.flat_map { |pool| pool[:workers] }
  @ractors_map.compact! # Ensure map doesn't contain nil keys/values
  return unless ENV["FRACTOR_DEBUG"]

  puts "Workers started: #{@workers.size} active across #{@worker_pools.size} pools."
end

#stopObject

Stop the supervisor (for continuous mode)



361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
# File 'lib/fractor/supervisor.rb', line 361

def stop
  @running = false
  puts "Stopping supervisor..." if ENV["FRACTOR_DEBUG"]

  # Wait for timer thread to finish if it exists
  if @timer_thread&.alive?
    @timer_thread.join(1) # Wait up to 1 second
    puts "Timer thread stopped" if ENV["FRACTOR_DEBUG"]
  end

  # Signal the wakeup ractor first to unblock Ractor.select
  if @wakeup_ractor
    begin
      @wakeup_ractor.send(:shutdown)
      puts "Sent shutdown signal to wakeup ractor" if ENV["FRACTOR_DEBUG"]
    rescue StandardError => e
      puts "Error sending shutdown to wakeup ractor: #{e.message}" if ENV["FRACTOR_DEBUG"]
    end
  end

  # Send shutdown signal to all workers
  @workers.each do |w|
    begin
      w.send(:shutdown)
    rescue StandardError
      nil
    end
    puts "Sent shutdown signal to #{w.name}" if ENV["FRACTOR_DEBUG"]
  end
end