Class: Backports::Ractor

Inherits:
Object show all
Defined in:
lib/backports/ractor/cloner.rb,
lib/backports/ractor/errors.rb,
lib/backports/ractor/queues.rb,
lib/backports/ractor/ractor.rb,
lib/backports/ractor/sharing.rb

Defined Under Namespace

Classes: ClosedError, Error, RemoteError

Class Attribute Summary collapse

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(*args, &block) ⇒ Ractor

Implementation notes

Uses one ‘Thread` for each `Ractor`, as well as queues for communication

The incoming queue is strict: contrary to standard queue, you can’t pop from an empty closed queue. Since standard queues return ‘nil` is those conditions, we wrap/unwrap `nil` values and consider all `nil` values to be results of closed queues. `ClosedQueueError` are re-raised as `Ractor::ClosedError`

The outgoing queue is strict and blocking. Same wrapping / raising as incoming, with an extra queue to acknowledge when a value has been read (or if the port is closed while waiting).

The last result is a bit tricky as it needs to be pushed on the outgoing queue but can not be blocking. For this, we “soft close” the outgoing port.

Raises:

  • (::ArgumentError)


30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/backports/ractor/ractor.rb', line 30

def initialize(*args, &block)
  @ractor_incoming_queue = IncomingQueue.new
  @ractor_outgoing_queue = OutgoingQueue.new
  raise ::ArgumentError, 'must be called with a block' unless block

  kw = args.last
  if kw.is_a?(::Hash) && kw.size == 1 && kw.key?(:name)
    args.pop
    name = kw[:name]
  end
  @ractor_name = name && Backports.coerce_to_str(name)

  @id = Ractor.ractor_next_id
  if Ractor.main == nil # then initializing main Ractor
    @ractor_thread = ::Thread.current
    @ractor_origin = nil
    @ractor_thread.thread_variable_set(:backports_ractor, self)
  else
    @ractor_origin = caller(1, 1).first.split(':in `').first

    args.map! { |a| Ractor.ractor_isolate(a, false) }
    ractor_thread_start(args, block)
  end
end

Class Attribute Details

.mainObject (readonly)

Returns the value of attribute main.



248
249
250
# File 'lib/backports/ractor/ractor.rb', line 248

def main
  @main
end

Instance Attribute Details

#ractor_incoming_queueObject (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



268
269
270
# File 'lib/backports/ractor/ractor.rb', line 268

def ractor_incoming_queue
  @ractor_incoming_queue
end

#ractor_outgoing_queueObject (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



268
269
270
# File 'lib/backports/ractor/ractor.rb', line 268

def ractor_outgoing_queue
  @ractor_outgoing_queue
end

#ractor_threadObject (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



268
269
270
# File 'lib/backports/ractor/ractor.rb', line 268

def ractor_thread
  @ractor_thread
end

Class Method Details

.countObject



226
227
228
# File 'lib/backports/ractor/ractor.rb', line 226

def count
  ::ObjectSpace.each_object(Ractor).count(&:ractor_live?)
end

.currentObject



221
222
223
224
# File 'lib/backports/ractor/ractor.rb', line 221

def current
  ::Thread.current.thread_variable_get(:backports_ractor) ||
    ::Thread.current.thread_variable_set(:backports_ractor, ractor_find_current)
end

.make_shareable(obj) ⇒ Object

Raises:



211
212
213
214
215
# File 'lib/backports/ractor/ractor.rb', line 211

def make_shareable(obj)
  return obj if ractor_check_shareability?(obj, true)

  raise Ractor::Error, '#freeze does not freeze object correctly'
end

.ractor_isolate(val, move = false) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



7
8
9
10
11
# File 'lib/backports/ractor/sharing.rb', line 7

def ractor_isolate(val, move = false)
  return val if move

  Cloner.deep_clone(val)
end

.ractor_mark_set_shareable(visited) ⇒ Object



49
50
51
52
53
# File 'lib/backports/ractor/sharing.rb', line 49

def ractor_mark_set_shareable(visited)
  visited.each do |key|
    @ractor_shareable[key] = Ractor
  end
end

.ractor_next_idObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



243
244
245
246
# File 'lib/backports/ractor/ractor.rb', line 243

def ractor_next_id
  @id ||= 0
  @id += 1
end

.ractor_resetObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



231
232
233
234
235
236
237
238
239
240
# File 'lib/backports/ractor/ractor.rb', line 231

def ractor_reset
  ::ObjectSpace.each_object(Ractor).each do |r|
    next if r == Ractor.current
    next unless (th = r.ractor_thread)

    th.kill
    th.join
  end
  Ractor.current.ractor_incoming_queue.clear
end

.ractor_shareable_self?(obj, freeze_all) ⇒ Boolean

yield if shareability can’t be determined without looking at its parts

Returns:

  • (Boolean)


26
27
28
29
30
31
32
33
34
# File 'lib/backports/ractor/sharing.rb', line 26

def ractor_shareable_self?(obj, freeze_all)
  return true if @ractor_shareable.key?(obj)
  return true if ractor_shareable_by_nature?(obj, freeze_all)
  if obj.frozen? || (freeze_all && obj.freeze)
    yield
  else
    false
  end
end

.receiveObject Also known as: recv



171
172
173
# File 'lib/backports/ractor/ractor.rb', line 171

def receive
  current.__send__(:receive)
end

.receive_if(&block) ⇒ Object



176
177
178
# File 'lib/backports/ractor/ractor.rb', line 176

def receive_if(&block)
  current.__send__(:receive_if, &block)
end

.select(*ractors, yield_value: not_given = true, move: false) ⇒ Object



180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
# File 'lib/backports/ractor/ractor.rb', line 180

def select(*ractors, yield_value: not_given = true, move: false)
  cur = Ractor.current
  queues = ractors.map do |r|
    r == cur ? r.ractor_incoming_queue : r.ractor_outgoing_queue
  end
  if !not_given
    out = current.ractor_outgoing_queue
    yield_value = ractor_isolate(yield_value, move)
  elsif ractors.empty?
    raise ::ArgumentError, 'specify at least one ractor or `yield_value`'
  end

  while true # rubocop:disable Style/InfiniteLoop
              # Don't `loop`, in case of `ClosedError` (not that there should be any)
    queues.each_with_index do |q, i|
      q.pop_non_blocking do |val|
        r = ractors[i]
        return [r == cur ? :receive : r, val]
      end
    end

    if out && out.num_waiting > 0
      # Not quite atomic...
      out.push(yield_value, ack: true)
      return [:yield, nil]
    end

    sleep(0.001)
  end
end

.shareable?(obj) ⇒ Boolean

Returns:

  • (Boolean)


217
218
219
# File 'lib/backports/ractor/ractor.rb', line 217

def shareable?(obj)
  ractor_check_shareability?(obj, false)
end

.yield(value, move: false) ⇒ Object



164
165
166
167
168
169
# File 'lib/backports/ractor/ractor.rb', line 164

def yield(value, move: false)
  value = ractor_isolate(value, move)
  current.ractor_outgoing_queue.push(value, ack: true)
rescue ::ClosedQueueError
  raise ClosedError, 'The outgoing-port is already closed'
end

Instance Method Details

#[](key) ⇒ Object



150
151
152
# File 'lib/backports/ractor/ractor.rb', line 150

def [](key)
  Ractor.current.ractor_locals[key]
end

#[]=(key, value) ⇒ Object



154
155
156
# File 'lib/backports/ractor/ractor.rb', line 154

def []=(key, value)
  Ractor.current.ractor_locals[key] = value
end

#close_incomingObject



129
130
131
132
133
# File 'lib/backports/ractor/ractor.rb', line 129

def close_incoming
  r = ractor_incoming_queue.closed?
  ractor_incoming_queue.close
  r
end

#close_outgoingObject



135
136
137
138
139
# File 'lib/backports/ractor/ractor.rb', line 135

def close_outgoing
  r = ractor_outgoing_queue.closed?
  ractor_outgoing_queue.close
  r
end

#inspectObject



117
118
119
120
121
122
123
124
125
126
127
# File 'lib/backports/ractor/ractor.rb', line 117

def inspect
  state = RACTOR_STATE[@ractor_thread ? @ractor_thread.status : 'run']
  info = [
    "Ractor:##{@id}",
    name,
    @ractor_origin,
    state,
  ].compact.join(' ')

  "#<#{info}>"
end

#nameObject



104
105
106
# File 'lib/backports/ractor/ractor.rb', line 104

def name
  @ractor_name
end

#ractor_live?Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns:

  • (Boolean)


262
263
264
265
# File 'lib/backports/ractor/ractor.rb', line 262

def ractor_live?
  !defined?(@ractor_thread) || # May happen if `count` is called from another thread before `initialize` has completed
    @ractor_thread.status
end

#ractor_localsObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



159
160
161
# File 'lib/backports/ractor/ractor.rb', line 159

def ractor_locals
  @ractor_locals ||= {}.compare_by_identity
end

#send(obj, move: false) ⇒ Object Also known as: <<



92
93
94
95
96
97
# File 'lib/backports/ractor/ractor.rb', line 92

def send(obj, move: false)
  ractor_incoming_queue << Ractor.ractor_isolate(obj, move)
  self
rescue ::ClosedQueueError
  raise ClosedError, 'The incoming-port is already closed'
end

#takeObject



100
101
102
# File 'lib/backports/ractor/ractor.rb', line 100

def take
  ractor_outgoing_queue.pop(ack: true)
end