Class: Fractor::WrappedRactor
- Inherits:
-
Object
- Object
- Fractor::WrappedRactor
- Defined in:
- lib/fractor/wrapped_ractor.rb
Overview
Wraps a Ruby Ractor to manage a worker instance. Handles communication and error propagation.
Instance Attribute Summary collapse
-
#name ⇒ Object
readonly
Returns the value of attribute name.
-
#ractor ⇒ Object
readonly
Returns the value of attribute ractor.
Instance Method Summary collapse
-
#close ⇒ Object
Closes the Ractor.
-
#closed? ⇒ Boolean
Checks if the Ractor is closed or unavailable.
-
#initialize(name, worker_class) ⇒ WrappedRactor
constructor
Initializes the WrappedRactor with a name and the Worker class to instantiate.
-
#send(work) ⇒ Object
Sends work to the Ractor if it’s active.
-
#start ⇒ Object
Starts the underlying Ractor.
Constructor Details
#initialize(name, worker_class) ⇒ WrappedRactor
Initializes the WrappedRactor with a name and the Worker class to instantiate. The worker_class parameter allows flexibility in specifying the worker type.
11 12 13 14 15 16 |
# File 'lib/fractor/wrapped_ractor.rb', line 11 def initialize(name, worker_class) puts "Creating Ractor #{name} with worker #{worker_class}" if ENV["FRACTOR_DEBUG"] @name = name @worker_class = worker_class # Store the worker class @ractor = nil # Initialize ractor as nil end |
Instance Attribute Details
#name ⇒ Object (readonly)
Returns the value of attribute name.
7 8 9 |
# File 'lib/fractor/wrapped_ractor.rb', line 7 def name @name end |
#ractor ⇒ Object (readonly)
Returns the value of attribute ractor.
7 8 9 |
# File 'lib/fractor/wrapped_ractor.rb', line 7 def ractor @ractor end |
Instance Method Details
#close ⇒ Object
Closes the Ractor. Ruby 3.0+ has different ways to terminate Ractors, we try the available methods
97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 |
# File 'lib/fractor/wrapped_ractor.rb', line 97 def close return true if @ractor.nil? begin # Send a nil message to signal we're done - this might be processed # if the Ractor is waiting for input begin begin @ractor.send(nil) rescue StandardError nil end rescue StandardError # Ignore errors when sending nil end # Mark as closed in our object old_ractor = @ractor @ractor = nil # If available in this Ruby version, we'll try kill if old_ractor.respond_to?(:kill) begin old_ractor.kill rescue StandardError nil end end true rescue Exception => e puts "Warning: Error closing Ractor #{@name}: #{e.message}" if ENV["FRACTOR_DEBUG"] # Consider it closed even if there was an error @ractor = nil true end end |
#closed? ⇒ Boolean
Checks if the Ractor is closed or unavailable.
136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 |
# File 'lib/fractor/wrapped_ractor.rb', line 136 def closed? return true if @ractor.nil? begin # Check if the Ractor is terminated using Ractor#inspect # This is safer than calling methods on the Ractor r_status = @ractor.inspect if r_status.include?("terminated") # If terminated, clean up our reference @ractor = nil return true end false rescue Exception => e # If we get an exception, the Ractor is likely terminated puts "Ractor #{@name} appears to be terminated: #{e.message}" if ENV["FRACTOR_DEBUG"] @ractor = nil true end end |
#send(work) ⇒ Object
Sends work to the Ractor if it’s active.
80 81 82 83 84 85 86 87 88 89 90 91 92 93 |
# File 'lib/fractor/wrapped_ractor.rb', line 80 def send(work) if @ractor begin @ractor.send(work) true rescue Exception => e puts "Warning: Error sending work to Ractor #{@name}: #{e.message}" if ENV["FRACTOR_DEBUG"] false end else puts "Warning: Attempted to send work to nil Ractor #{@name}" if ENV["FRACTOR_DEBUG"] false end end |
#start ⇒ Object
Starts the underlying Ractor.
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 |
# File 'lib/fractor/wrapped_ractor.rb', line 19 def start puts "Starting Ractor #{@name}" if ENV["FRACTOR_DEBUG"] # Pass worker_class to the Ractor block @ractor = Ractor.new(@name, @worker_class) do |name, worker_cls| puts "Ractor #{name} started with worker class #{worker_cls}" if ENV["FRACTOR_DEBUG"] # Yield an initialization message Ractor.yield({ type: :initialize, processor: name }) # Instantiate the specific worker inside the Ractor worker = worker_cls.new(name: name) loop do # Ractor.receive will block until a message is received puts "Waiting for work in #{name}" if ENV["FRACTOR_DEBUG"] work = Ractor.receive # Handle shutdown message if work == :shutdown puts "Received shutdown message in Ractor #{name}, terminating..." if ENV["FRACTOR_DEBUG"] # Yield a shutdown acknowledgment before terminating Ractor.yield({ type: :shutdown, processor: name }) break end puts "Received work #{work.inspect} in #{name}" if ENV["FRACTOR_DEBUG"] begin # Process the work using the instantiated worker result = worker.process(work) puts "Sending result #{result.inspect} from Ractor #{name}" if ENV["FRACTOR_DEBUG"] # Wrap the result in a WorkResult object if not already wrapped work_result = if result.is_a?(Fractor::WorkResult) result else Fractor::WorkResult.new(result: result, work: work) end # Yield the result back Ractor.yield({ type: :result, result: work_result, processor: name }) rescue StandardError => e # Handle errors during processing puts "Error processing work #{work.inspect} in Ractor #{name}: #{e.message}\n#{e.backtrace.join("\n")}" if ENV["FRACTOR_DEBUG"] # Yield an error message back # Ensure the original work object is included in the error result error_result = Fractor::WorkResult.new(error: e., work: work) Ractor.yield({ type: :error, result: error_result, processor: name }) end end rescue Ractor::ClosedError puts "Ractor #{name} closed." if ENV["FRACTOR_DEBUG"] rescue StandardError => e puts "Unexpected error in Ractor #{name}: #{e.message}\n#{e.backtrace.join("\n")}" if ENV["FRACTOR_DEBUG"] # Optionally yield a critical error message if needed ensure puts "Ractor #{name} shutting down." if ENV["FRACTOR_DEBUG"] end puts "Ractor #{@name} instance created: #{@ractor}" if ENV["FRACTOR_DEBUG"] end |