Class: Fractor::WrappedRactor

Inherits:
Object
  • Object
show all
Defined in:
lib/fractor/wrapped_ractor.rb

Overview

Wraps a Ruby Ractor to manage a worker instance. Handles communication and error propagation.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(name, worker_class) ⇒ WrappedRactor

Initializes the WrappedRactor with a name and the Worker class to instantiate. The worker_class parameter allows flexibility in specifying the worker type.



11
12
13
14
15
16
# File 'lib/fractor/wrapped_ractor.rb', line 11

def initialize(name, worker_class)
  puts "Creating Ractor #{name} with worker #{worker_class}" if ENV["FRACTOR_DEBUG"]
  @name = name
  @worker_class = worker_class # Store the worker class
  @ractor = nil # Initialize ractor as nil
end

Instance Attribute Details

#nameObject (readonly)

Returns the value of attribute name.



7
8
9
# File 'lib/fractor/wrapped_ractor.rb', line 7

def name
  @name
end

#ractorObject (readonly)

Returns the value of attribute ractor.



7
8
9
# File 'lib/fractor/wrapped_ractor.rb', line 7

def ractor
  @ractor
end

Instance Method Details

#closeObject

Closes the Ractor. Ruby 3.0+ has different ways to terminate Ractors, we try the available methods



97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# File 'lib/fractor/wrapped_ractor.rb', line 97

def close
  return true if @ractor.nil?

  begin
    # Send a nil message to signal we're done - this might be processed
    # if the Ractor is waiting for input
    begin
      begin
        @ractor.send(nil)
      rescue StandardError
        nil
      end
    rescue StandardError
      # Ignore errors when sending nil
    end

    # Mark as closed in our object
    old_ractor = @ractor
    @ractor = nil

    # If available in this Ruby version, we'll try kill
    if old_ractor.respond_to?(:kill)
      begin
        old_ractor.kill
      rescue StandardError
        nil
      end
    end

    true
  rescue Exception => e
    puts "Warning: Error closing Ractor #{@name}: #{e.message}" if ENV["FRACTOR_DEBUG"]
    # Consider it closed even if there was an error
    @ractor = nil
    true
  end
end

#closed?Boolean

Checks if the Ractor is closed or unavailable.

Returns:

  • (Boolean)


136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
# File 'lib/fractor/wrapped_ractor.rb', line 136

def closed?
  return true if @ractor.nil?

  begin
    # Check if the Ractor is terminated using Ractor#inspect
    # This is safer than calling methods on the Ractor
    r_status = @ractor.inspect
    if r_status.include?("terminated")
      # If terminated, clean up our reference
      @ractor = nil
      return true
    end
    false
  rescue Exception => e
    # If we get an exception, the Ractor is likely terminated
    puts "Ractor #{@name} appears to be terminated: #{e.message}" if ENV["FRACTOR_DEBUG"]
    @ractor = nil
    true
  end
end

#send(work) ⇒ Object

Sends work to the Ractor if it’s active.



80
81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/fractor/wrapped_ractor.rb', line 80

def send(work)
  if @ractor
    begin
      @ractor.send(work)
      true
    rescue Exception => e
      puts "Warning: Error sending work to Ractor #{@name}: #{e.message}" if ENV["FRACTOR_DEBUG"]
      false
    end
  else
    puts "Warning: Attempted to send work to nil Ractor #{@name}" if ENV["FRACTOR_DEBUG"]
    false
  end
end

#startObject

Starts the underlying Ractor.



19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/fractor/wrapped_ractor.rb', line 19

def start
  puts "Starting Ractor #{@name}" if ENV["FRACTOR_DEBUG"]
  # Pass worker_class to the Ractor block
  @ractor = Ractor.new(@name, @worker_class) do |name, worker_cls|
    puts "Ractor #{name} started with worker class #{worker_cls}" if ENV["FRACTOR_DEBUG"]
    # Yield an initialization message
    Ractor.yield({ type: :initialize, processor: name })

    # Instantiate the specific worker inside the Ractor
    worker = worker_cls.new(name: name)

    loop do
      # Ractor.receive will block until a message is received
      puts "Waiting for work in #{name}" if ENV["FRACTOR_DEBUG"]
      work = Ractor.receive

      # Handle shutdown message
      if work == :shutdown
        puts "Received shutdown message in Ractor #{name}, terminating..." if ENV["FRACTOR_DEBUG"]
        # Yield a shutdown acknowledgment before terminating
        Ractor.yield({ type: :shutdown, processor: name })
        break
      end

      puts "Received work #{work.inspect} in #{name}" if ENV["FRACTOR_DEBUG"]

      begin
        # Process the work using the instantiated worker
        result = worker.process(work)
        puts "Sending result #{result.inspect} from Ractor #{name}" if ENV["FRACTOR_DEBUG"]
        # Wrap the result in a WorkResult object if not already wrapped
        work_result = if result.is_a?(Fractor::WorkResult)
                        result
                      else
                        Fractor::WorkResult.new(result: result, work: work)
                      end
        # Yield the result back
        Ractor.yield({ type: :result, result: work_result,
                       processor: name })
      rescue StandardError => e
        # Handle errors during processing
        puts "Error processing work #{work.inspect} in Ractor #{name}: #{e.message}\n#{e.backtrace.join("\n")}" if ENV["FRACTOR_DEBUG"]
        # Yield an error message back
        # Ensure the original work object is included in the error result
        error_result = Fractor::WorkResult.new(error: e.message, work: work)
        Ractor.yield({ type: :error, result: error_result,
                       processor: name })
      end
    end
  rescue Ractor::ClosedError
    puts "Ractor #{name} closed." if ENV["FRACTOR_DEBUG"]
  rescue StandardError => e
    puts "Unexpected error in Ractor #{name}: #{e.message}\n#{e.backtrace.join("\n")}" if ENV["FRACTOR_DEBUG"]
    # Optionally yield a critical error message if needed
  ensure
    puts "Ractor #{name} shutting down." if ENV["FRACTOR_DEBUG"]
  end
  puts "Ractor #{@name} instance created: #{@ractor}" if ENV["FRACTOR_DEBUG"]
end