Module: CrystalRuby::Reactor
- Defined in:
- lib/crystalruby/reactor.rb
Overview
The Reactor represents a singleton Thread responsible for running all Ruby/crystal interop code. Crystal’s Fiber scheduler and GC assume all code is run on a single thread. This class is responsible for multiplexing Ruby and Crystal code onto a single thread. Functions annotated with async: true, are executed using callbacks to allow these to be interleaved without blocking multiple Ruby threads.
Defined Under Namespace
Classes: SingleThreadViolation, StopReactor
Constant Summary collapse
- REACTOR_QUEUE =
Queue.new
- GC_OP_THRESHOLD =
Invoke GC every 100 ops
ENV.fetch("CRYSTAL_GC_OP_THRESHOLD", 100).to_i
- GC_INTERVAL =
Or every 0.05 seconds
ENV.fetch("CRYSTAL_GC_INTERVAL", 0.05).to_f
- GC_BYTES_SEEN_THRESHOLD =
Or if we’ve gotten hold of a reference to at least 100KB or more of fresh memory since last GC
ENV.fetch("CRYSTAL_GC_BYTES_SEEN_THRESHOLD", 100 * 1024).to_i
- THREAD_MAP =
We maintain a map of threads, each with a mutex, condition variable, and result
Hash.new do |h, tid_or_thread, tid = tid_or_thread| if tid_or_thread.is_a?(Thread) ObjectSpace.define_finalizer(tid_or_thread) do THREAD_MAP.delete(tid_or_thread) THREAD_MAP.delete(tid_or_thread.object_id) end tid = tid_or_thread.object_id end h[tid] = { mux: Mutex.new, cond: ConditionVariable.new, result: nil, thread_id: tid } h[tid_or_thread] = h[tid] if tid_or_thread.is_a?(Thread) end
- CALLBACKS_MAP =
We memoize callbacks, once per return type
Hash.new do |h, rt| h[rt] = FFI::Function.new(:void, [:int, *((rt == :void) ? [] : [rt])]) do |tid, ret| THREAD_MAP[tid][:error] = nil THREAD_MAP[tid][:result] = ret THREAD_MAP[tid][:cond].signal end end
- ERROR_CALLBACK =
FFI::Function.new(:void, %i[string string string int]) do |error_type, , backtrace, tid| error_type = error_type.to_sym is_exception_type = Object.const_defined?(error_type) && Object.const_get(error_type).ancestors.include?(Exception) error_type = is_exception_type ? Object.const_get(error_type) : RuntimeError error = error_type.new() error.set_backtrace(JSON.parse(backtrace)) raise error unless THREAD_MAP.key?(tid) THREAD_MAP[tid][:error] = error THREAD_MAP[tid][:result] = nil THREAD_MAP[tid][:cond].signal end
Class Method Summary collapse
- .await_result! ⇒ Object
- .gc_due? ⇒ Boolean
- .halt_loop! ⇒ Object
- .init_single_thread_mode! ⇒ Object
- .invoke_async!(receiver, op_name, *args, thread_id, callback, lib) ⇒ Object
- .invoke_await!(receiver, op_name, *args, lib) ⇒ Object
- .invoke_blocking!(receiver, op_name, *args, tvars, _lib) ⇒ Object
- .invoke_gc_if_due!(lib) ⇒ Object
- .running? ⇒ Boolean
- .schedule_work!(receiver, op_name, *args, return_type, blocking: true, async: true, lib: nil) ⇒ Object
- .start! ⇒ Object
- .start_gc_thread!(lib) ⇒ Object
- .stop! ⇒ Object
- .thread_conditions ⇒ Object
- .thread_id ⇒ Object
- .yield!(lib: nil, time: 0.0) ⇒ Object
Class Method Details
.await_result! ⇒ Object
73 74 75 76 77 78 79 80 81 82 83 84 85 86 |
# File 'lib/crystalruby/reactor.rb', line 73 def await_result! mux, cond, result, err = thread_conditions.values_at(:mux, :cond, :result, :error) cond.wait(mux) unless result || err result, err, thread_conditions[:result], thread_conditions[:error] = thread_conditions.values_at(:result, :error) if err combined_backtrace = err.backtrace[0..(err.backtrace.index { |m| m.include?("call_blocking_function") } || 2) - 3] + caller[5..-1] err.set_backtrace(combined_backtrace) raise err end result end |
.gc_due? ⇒ Boolean
124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 |
# File 'lib/crystalruby/reactor.rb', line 124 def gc_due? now = Process.clock_gettime(Process::CLOCK_MONOTONIC) # Initialize state variables if not already set. @last_gc_time ||= now @op_count ||= 0 @last_gc_op_count ||= @op_count @last_mem_check_time ||= now # Calculate differences based on ops and time. ops_since_last_gc = @op_count - @last_gc_op_count time_since_last_gc = now - @last_gc_time # Start with our two “cheap” conditions. due = (ops_since_last_gc >= GC_OP_THRESHOLD) || (time_since_last_gc >= GC_INTERVAL) || Types::Allocator.gc_bytes_seen > GC_BYTES_SEEN_THRESHOLD if due # Update the baseline values after GC is scheduled. @last_gc_time = now # If we just did a memory check, use that value; otherwise, fetch one now. @last_gc_op_count = @op_count Types::Allocator.gc_hint_reset! true else false end end |
.halt_loop! ⇒ Object
88 89 90 |
# File 'lib/crystalruby/reactor.rb', line 88 def halt_loop! raise StopReactor end |
.init_single_thread_mode! ⇒ Object
221 222 223 224 225 226 |
# File 'lib/crystalruby/reactor.rb', line 221 def init_single_thread_mode! @single_thread_mode ||= begin @main_thread_id = Thread.current.object_id true end end |
.invoke_async!(receiver, op_name, *args, thread_id, callback, lib) ⇒ Object
170 171 172 173 |
# File 'lib/crystalruby/reactor.rb', line 170 def invoke_async!(receiver, op_name, *args, thread_id, callback, lib) receiver.send(op_name, *args, thread_id, callback) yield!(lib: lib, time: 0) end |
.invoke_await!(receiver, op_name, *args, lib) ⇒ Object
188 189 190 191 |
# File 'lib/crystalruby/reactor.rb', line 188 def invoke_await!(receiver, op_name, *args, lib) outstanding_jobs = receiver.send(op_name, *args) yield!(lib: lib, time: 0) unless outstanding_jobs == 0 end |
.invoke_blocking!(receiver, op_name, *args, tvars, _lib) ⇒ Object
175 176 177 178 179 180 181 182 183 184 185 186 |
# File 'lib/crystalruby/reactor.rb', line 175 def invoke_blocking!(receiver, op_name, *args, tvars, _lib) tvars[:error] = nil begin tvars[:result] = receiver.send(op_name, *args) rescue StopReactor tvars[:cond].signal raise rescue => e tvars[:error] = e end tvars[:cond].signal end |
.invoke_gc_if_due!(lib) ⇒ Object
120 121 122 |
# File 'lib/crystalruby/reactor.rb', line 120 def invoke_gc_if_due!(lib) schedule_work!(lib, :gc, :void, blocking: true, async: false, lib: lib) if lib && gc_due? end |
.running? ⇒ Boolean
217 218 219 |
# File 'lib/crystalruby/reactor.rb', line 217 def running? @main_loop&.alive? end |
.schedule_work!(receiver, op_name, *args, return_type, blocking: true, async: true, lib: nil) ⇒ Object
193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 |
# File 'lib/crystalruby/reactor.rb', line 193 def schedule_work!(receiver, op_name, *args, return_type, blocking: true, async: true, lib: nil) if @single_thread_mode || (Thread.current.object_id == @main_thread_id && op_name != :yield) unless Thread.current.object_id == @main_thread_id raise SingleThreadViolation, "Single thread mode is enabled, cannot run in multi-threaded mode. " \ "Reactor was started from: #{@main_thread_id}, then called from #{Thread.current.object_id}" end invoke_gc_if_due!(lib) return receiver.send(op_name, *args) end tvars = thread_conditions tvars[:mux].synchronize do REACTOR_QUEUE.push( case true when async then [:invoke_async!, receiver, op_name, *args, tvars[:thread_id], CALLBACKS_MAP[return_type], lib] when blocking then [:invoke_blocking!, receiver, op_name, *args, tvars, lib] else [:invoke_await!, receiver, op_name, *args, lib] end ) return await_result! if blocking end end |
.start! ⇒ Object
101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 |
# File 'lib/crystalruby/reactor.rb', line 101 def start! @op_count = 0 @main_loop ||= Thread.new do @main_thread_id = Thread.current.object_id CrystalRuby.log_debug("Starting reactor") CrystalRuby.log_debug("CrystalRuby initialized") while true handler, *args, lib = REACTOR_QUEUE.pop send(handler, *args, lib) @op_count += 1 invoke_gc_if_due!(lib) end rescue StopReactor rescue => e CrystalRuby.log_error "Error: #{e}" CrystalRuby.log_error e.backtrace end end |
.start_gc_thread!(lib) ⇒ Object
152 153 154 155 156 157 158 159 |
# File 'lib/crystalruby/reactor.rb', line 152 def start_gc_thread!(lib) Thread.new do loop do schedule_work!(lib, :gc, :void, blocking: true, async: false, lib: lib) if gc_due? sleep GC_INTERVAL end end end |
.stop! ⇒ Object
92 93 94 95 96 97 98 99 |
# File 'lib/crystalruby/reactor.rb', line 92 def stop! return unless @main_loop schedule_work!(self, :halt_loop!, :void, blocking: true, async: false) @main_loop.join @main_loop = nil CrystalRuby.log_info "Reactor loop stopped" end |
.thread_conditions ⇒ Object
69 70 71 |
# File 'lib/crystalruby/reactor.rb', line 69 def thread_conditions THREAD_MAP[Thread.current] end |
.thread_id ⇒ Object
161 162 163 |
# File 'lib/crystalruby/reactor.rb', line 161 def thread_id Thread.current.object_id end |
.yield!(lib: nil, time: 0.0) ⇒ Object
165 166 167 168 |
# File 'lib/crystalruby/reactor.rb', line 165 def yield!(lib: nil, time: 0.0) schedule_work!(lib, :yield, :int, async: false, blocking: false, lib: lib) if running? && lib nil end |