Class: Fractor::ContinuousServer

Inherits:
Object
  • Object
show all
Defined in:
lib/fractor/continuous_server.rb

Overview

High-level wrapper for running Fractor in continuous mode. Handles threading, signal handling, and results processing automatically.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(worker_pools:, work_queue: nil, log_file: nil) ⇒ ContinuousServer

Initialize a continuous server

Parameters:

  • worker_pools (Array<Hash>)

    Worker pool configurations

  • work_queue (WorkQueue, nil) (defaults to: nil)

    Optional work queue to auto-register

  • log_file (String, nil) (defaults to: nil)

    Optional log file path



15
16
17
18
19
20
21
22
23
24
25
26
# File 'lib/fractor/continuous_server.rb', line 15

def initialize(worker_pools:, work_queue: nil, log_file: nil)
  @worker_pools = worker_pools
  @work_queue = work_queue
  @log_file_path = log_file
  @log_file = nil
  @result_callbacks = []
  @error_callbacks = []
  @supervisor = nil
  @supervisor_thread = nil
  @results_thread = nil
  @running = false
end

Instance Attribute Details

#supervisorObject (readonly)

Returns the value of attribute supervisor.



9
10
11
# File 'lib/fractor/continuous_server.rb', line 9

def supervisor
  @supervisor
end

#work_queueObject (readonly)

Returns the value of attribute work_queue.



9
10
11
# File 'lib/fractor/continuous_server.rb', line 9

def work_queue
  @work_queue
end

Instance Method Details

#on_error {|WorkResult| ... } ⇒ Object

Register a callback for errors

Yields:



36
37
38
# File 'lib/fractor/continuous_server.rb', line 36

def on_error(&block)
  @error_callbacks << block
end

#on_result {|WorkResult| ... } ⇒ Object

Register a callback for successful results

Yields:



30
31
32
# File 'lib/fractor/continuous_server.rb', line 30

def on_result(&block)
  @result_callbacks << block
end

#runObject

Start the server and block until shutdown This method handles:

  • Opening log file if specified

  • Creating and starting supervisor

  • Starting results processing thread

  • Setting up signal handlers

  • Blocking until shutdown signal received



47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/fractor/continuous_server.rb', line 47

def run
  setup_log_file
  setup_supervisor
  start_supervisor_thread
  start_results_thread

  log_message("Continuous server started")
  log_message("Press Ctrl+C to stop")

  begin
    # Block until shutdown
    @supervisor_thread&.join
  rescue Interrupt
    log_message("Interrupt received, shutting down...")
  ensure
    cleanup
  end
end

#stopObject

Stop the server programmatically



67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/fractor/continuous_server.rb', line 67

def stop
  return unless @running

  log_message("Stopping continuous server...")
  @running = false

  @supervisor&.stop

  # Wait for threads to finish
  [@supervisor_thread, @results_thread].compact.each do |thread|
    thread.join(2) if thread.alive?
  end

  log_message("Continuous server stopped")
end