Class: Goru::Reactor
- Inherits:
-
Object
- Object
- Goru::Reactor
- Defined in:
- lib/goru/reactor.rb
Overview
- public
Instance Attribute Summary collapse
-
#status ⇒ Object
readonly
[public].
Instance Method Summary collapse
-
#adopt_routine(routine) ⇒ Object
[public].
-
#finished? ⇒ Boolean
[public].
-
#initialize(queue:, scheduler:) ⇒ Reactor
constructor
A new instance of Reactor.
-
#routine_asleep(routine, seconds) ⇒ Object
[public].
-
#routine_finished(routine) ⇒ Object
[public].
-
#run ⇒ Object
[public].
-
#signal ⇒ Object
[public].
-
#stop ⇒ Object
[public].
-
#wakeup ⇒ Object
[public].
Constructor Details
#initialize(queue:, scheduler:) ⇒ Reactor
Returns a new instance of Reactor.
15 16 17 18 19 20 21 22 23 24 25 |
# File 'lib/goru/reactor.rb', line 15 def initialize(queue:, scheduler:) @queue = queue @scheduler = scheduler @routines = Set.new @bridges = Set.new @timers = Timers::Group.new @selector = NIO::Selector.new @stopped = false @status = nil @mutex = Mutex.new end |
Instance Attribute Details
#status ⇒ Object (readonly)
- public
29 30 31 |
# File 'lib/goru/reactor.rb', line 29 def status @status end |
Instance Method Details
#adopt_routine(routine) ⇒ Object
- public
157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 |
# File 'lib/goru/reactor.rb', line 157 def adopt_routine(routine) case routine when Routines::IO monitor = @selector.register(routine.io, routine.intent) monitor.value = routine routine.monitor = monitor routine.reactor = self when Routines::Bridge routine.reactor = self @bridges << routine when Routine routine.reactor = self @routines << routine end end |
#finished? ⇒ Boolean
- public
118 119 120 121 122 |
# File 'lib/goru/reactor.rb', line 118 def finished? @mutex.synchronize do @status == :idle || @status == :stopped end end |
#routine_asleep(routine, seconds) ⇒ Object
- public
149 150 151 152 153 |
# File 'lib/goru/reactor.rb', line 149 def routine_asleep(routine, seconds) @timers.after(seconds) { routine.wake } end |
#routine_finished(routine) ⇒ Object
- public
175 176 177 178 179 180 181 182 183 184 |
# File 'lib/goru/reactor.rb', line 175 def routine_finished(routine) case routine when Routines::Bridge @bridges.delete(routine) when Routines::IO @selector.deregister(routine.io) else @routines.delete(routine) end end |
#run ⇒ Object
- public
33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/goru/reactor.rb', line 33 def run until @stopped set_status(:running) @routines.each do |routine| call_routine(routine) end begin wait_for_routine(block: false) rescue ThreadError interval = @timers.wait_interval if interval.nil? if @routines.empty? if @selector.empty? become_idle else wait_for_bridge do wait_for_selector end end else wait_for_bridge do wait_for_selector(0) end end elsif interval > 0 if @selector.empty? wait_for_interval(interval) else wait_for_bridge do wait_for_selector(interval) end end end @timers.fire end end ensure @selector.close set_status(:finished) end |
#signal ⇒ Object
- public
126 127 128 129 130 |
# File 'lib/goru/reactor.rb', line 126 def signal unless @selector.empty? @selector.wakeup end end |
#stop ⇒ Object
- public
141 142 143 144 145 |
# File 'lib/goru/reactor.rb', line 141 def stop @stopped = true @selector.wakeup rescue IOError end |
#wakeup ⇒ Object
- public
134 135 136 137 |
# File 'lib/goru/reactor.rb', line 134 def wakeup signal @queue << :wakeup end |