Class: Async::Scheduler
Overview
Handles scheduling of fibers. Implements the fiber scheduler interface.
Direct Known Subclasses
Defined Under Namespace
Classes: ClosedError
Instance Attribute Summary
Attributes inherited from Node
#annotation, #children, #head, #parent, #tail
Class Method Summary collapse
-
.supported? ⇒ Boolean
Whether the fiber scheduler is supported.
Instance Method Summary collapse
- #address_resolve(hostname) ⇒ Object
-
#async(*arguments, **options, &block) ⇒ Object
deprecated
Deprecated.
With no replacement.
-
#block(blocker, timeout) ⇒ Object
Invoked when a fiber tries to perform a blocking operation which cannot continue.
- #close ⇒ Object
- #closed? ⇒ Boolean
- #fiber ⇒ Object
-
#initialize(parent = nil, selector: nil) ⇒ Scheduler
constructor
A new instance of Scheduler.
-
#interrupt ⇒ Object
Interrupt the event loop and cause it to exit.
- #io_read(io, buffer, length, offset = 0) ⇒ Object
- #io_wait(io, events, timeout = nil) ⇒ Object
- #io_write(io, buffer, length, offset = 0) ⇒ Object
- #kernel_sleep(duration = nil) ⇒ Object
-
#process_wait(pid, flags) ⇒ Object
Wait for the specified process ID to exit.
-
#push(fiber) ⇒ Object
Schedule a fiber (or equivalent object) to be resumed on the next loop through the reactor.
- #raise(*arguments) ⇒ Object
- #resume(fiber, *arguments) ⇒ Object
-
#run ⇒ Object
Run the reactor until all tasks are finished.
-
#run_once(timeout = nil) ⇒ Object
Run one iteration of the event loop.
- #scheduler_close ⇒ Object
- #timeout_after(duration, exception, message, &block) ⇒ Object
- #to_s ⇒ Object
-
#transfer ⇒ Object
Transfer from the calling fiber to the event loop.
- #unblock(blocker, fiber) ⇒ Object
-
#with_timeout(duration, exception = TimeoutError, message = "execution expired", &block) ⇒ Object
Invoke the block, but after the specified timeout, raise TimeoutError in any currenly blocking operation.
-
#yield ⇒ Object
Yield the current fiber and resume it on the next iteration of the event loop.
Methods inherited from Node
#The parent node.=, #annotate, #backtrace, #children?, #consume, #description, #finished?, #print_hierarchy, #root, #stop, #stopped?, #terminate, #transient?, #traverse
Constructor Details
#initialize(parent = nil, selector: nil) ⇒ Scheduler
Returns a new instance of Scheduler.
32 33 34 35 36 37 38 39 40 41 |
# File 'lib/async/scheduler.rb', line 32 def initialize(parent = nil, selector: nil) super(parent) @selector = selector || ::IO::Event::Selector.new(Fiber.current) @interrupted = false @blocked = 0 @timers = ::Timers::Group.new end |
Class Method Details
.supported? ⇒ Boolean
Whether the fiber scheduler is supported.
28 29 30 |
# File 'lib/async/scheduler.rb', line 28 def self.supported? true end |
Instance Method Details
#address_resolve(hostname) ⇒ Object
161 162 163 164 165 166 |
# File 'lib/async/scheduler.rb', line 161 def address_resolve(hostname) # On some platforms, hostnames may contain a device-specific suffix (e.g. %en0). We need to strip this before resolving. # See <https://github.com/socketry/async/issues/180> for more details. hostname = hostname.split("%", 2).first ::Resolv.getaddresses(hostname) end |
#async(*arguments, **options, &block) ⇒ Object
With no replacement.
Start an asynchronous task within the specified reactor. The task will be executed until the first blocking call, at which point it will yield and and this method will return.
This is the main entry point for scheduling asynchronus tasks.
297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 |
# File 'lib/async/scheduler.rb', line 297 def async(*arguments, **, &block) Kernel::raise ClosedError if @selector.nil? task = Task.new(Task.current? || self, **, &block) # I want to take a moment to explain the logic of this. # When calling an async block, we deterministically execute it until the # first blocking operation. We don't *have* to do this - we could schedule # it for later execution, but it's useful to: # - Fail at the point of the method call where possible. # - Execute determinstically where possible. # - Avoid scheduler overhead if no blocking operation is performed. task.run(*arguments) # Console.logger.debug "Initial execution of task #{fiber} complete (#{result} -> #{fiber.alive?})..." return task end |
#block(blocker, timeout) ⇒ Object
Invoked when a fiber tries to perform a blocking operation which cannot continue. A corresponding call #unblock must be performed to allow this fiber to continue.
118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 |
# File 'lib/async/scheduler.rb', line 118 def block(blocker, timeout) # $stderr.puts "block(#{blocker}, #{Fiber.current}, #{timeout})" fiber = Fiber.current if timeout timer = @timers.after(timeout) do if fiber.alive? fiber.transfer(false) end end end begin @blocked += 1 @selector.transfer ensure @blocked -= 1 end ensure timer&.cancel end |
#close ⇒ Object
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 |
# File 'lib/async/scheduler.rb', line 53 def close # It's critical to stop all tasks. Otherwise they might be holding on to resources which are never closed/released correctly. until self.terminate self.run_once! end Kernel.raise "Closing scheduler with blocked operations!" if @blocked > 0 # We depend on GVL for consistency: # @guard.synchronize do # We want `@selector = nil` to be a visible side effect from this point forward, specifically in `#interrupt` and `#unblock`. If the selector is closed, then we don't want to push any fibers to it. selector = @selector @selector = nil selector&.close # end consume end |
#closed? ⇒ Boolean
77 78 79 |
# File 'lib/async/scheduler.rb', line 77 def closed? @selector.nil? end |
#fiber ⇒ Object
315 316 317 |
# File 'lib/async/scheduler.rb', line 315 def fiber(...) return async(...).fiber end |
#interrupt ⇒ Object
Interrupt the event loop and cause it to exit.
87 88 89 90 |
# File 'lib/async/scheduler.rb', line 87 def interrupt @interrupted = true @selector&.wakeup end |
#io_read(io, buffer, length, offset = 0) ⇒ Object
184 185 186 |
# File 'lib/async/scheduler.rb', line 184 def io_read(io, buffer, length, offset = 0) @selector.io_read(Fiber.current, io, buffer, length, offset) end |
#io_wait(io, events, timeout = nil) ⇒ Object
169 170 171 172 173 174 175 176 177 178 179 180 181 |
# File 'lib/async/scheduler.rb', line 169 def io_wait(io, events, timeout = nil) fiber = Fiber.current if timeout timer = @timers.after(timeout) do fiber.transfer end end return @selector.io_wait(fiber, io, events) ensure timer&.cancel end |
#io_write(io, buffer, length, offset = 0) ⇒ Object
189 190 191 |
# File 'lib/async/scheduler.rb', line 189 def io_write(io, buffer, length, offset = 0) @selector.io_write(Fiber.current, io, buffer, length, offset) end |
#kernel_sleep(duration = nil) ⇒ Object
152 153 154 155 156 157 158 |
# File 'lib/async/scheduler.rb', line 152 def kernel_sleep(duration = nil) if duration self.block(nil, duration) else self.transfer end end |
#process_wait(pid, flags) ⇒ Object
Wait for the specified process ID to exit.
200 201 202 |
# File 'lib/async/scheduler.rb', line 200 def process_wait(pid, flags) return @selector.process_wait(Fiber.current, pid, flags) end |
#push(fiber) ⇒ Object
Schedule a fiber (or equivalent object) to be resumed on the next loop through the reactor.
104 105 106 |
# File 'lib/async/scheduler.rb', line 104 def push(fiber) @selector.push(fiber) end |
#raise(*arguments) ⇒ Object
108 109 110 |
# File 'lib/async/scheduler.rb', line 108 def raise(*arguments) @selector.raise(*arguments) end |
#resume(fiber, *arguments) ⇒ Object
112 113 114 |
# File 'lib/async/scheduler.rb', line 112 def resume(fiber, *arguments) @selector.resume(fiber, *arguments) end |
#run ⇒ Object
Run the reactor until all tasks are finished. Proxies arguments to #async immediately before entering the loop, if a block is provided.
267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 |
# File 'lib/async/scheduler.rb', line 267 def run(...) Kernel::raise ClosedError if @selector.nil? initial_task = self.async(...) if block_given? # In theory, we could use Exception here to be a little bit safer, but we've only shown the case for SignalException to be a problem, so let's not over-engineer this. Thread.handle_interrupt(SignalException => :never) do while true # If we are interrupted, we need to exit: break if self.interrupted? # If we are finished, we need to exit: break unless self.run_once end end return initial_task ensure Console.logger.debug(self) {"Exiting run-loop because #{$! ? $! : 'finished'}."} end |
#run_once(timeout = nil) ⇒ Object
Run one iteration of the event loop. Does not handle interrupts.
208 209 210 211 212 213 214 215 216 217 |
# File 'lib/async/scheduler.rb', line 208 def run_once(timeout = nil) Kernel::raise "Running scheduler on non-blocking fiber!" unless Fiber.blocking? # If we are finished, we stop the task tree and exit: if self.finished? return false end return run_once!(timeout) end |
#scheduler_close ⇒ Object
43 44 45 46 47 48 49 50 |
# File 'lib/async/scheduler.rb', line 43 def scheduler_close # If the execution context (thread) was handling an exception, we want to exit as quickly as possible: unless $! self.run end ensure self.close end |
#timeout_after(duration, exception, message, &block) ⇒ Object
335 336 337 338 339 |
# File 'lib/async/scheduler.rb', line 335 def timeout_after(duration, exception, , &block) with_timeout(duration, exception, ) do |timer| yield duration end end |
#to_s ⇒ Object
81 82 83 |
# File 'lib/async/scheduler.rb', line 81 def to_s "\#<#{self.description} #{@children&.size || 0} children (#{stopped? ? 'stopped' : 'running'})>" end |
#transfer ⇒ Object
Transfer from the calling fiber to the event loop.
93 94 95 |
# File 'lib/async/scheduler.rb', line 93 def transfer @selector.transfer end |
#unblock(blocker, fiber) ⇒ Object
141 142 143 144 145 146 147 148 149 |
# File 'lib/async/scheduler.rb', line 141 def unblock(blocker, fiber) # $stderr.puts "unblock(#{blocker}, #{fiber})" # This operation is protected by the GVL: if selector = @selector selector.push(fiber) selector.wakeup end end |
#with_timeout(duration, exception = TimeoutError, message = "execution expired", &block) ⇒ Object
Invoke the block, but after the specified timeout, raise TimeoutError in any currenly blocking operation. If the block runs to completion before the timeout occurs or there are no non-blocking operations after the timeout expires, the code will complete without any exception.
321 322 323 324 325 326 327 328 329 330 331 332 333 |
# File 'lib/async/scheduler.rb', line 321 def with_timeout(duration, exception = TimeoutError, = "execution expired", &block) fiber = Fiber.current timer = @timers.after(duration) do if fiber.alive? fiber.raise(exception, ) end end yield timer ensure timer.cancel if timer end |
#yield ⇒ Object
Yield the current fiber and resume it on the next iteration of the event loop.
98 99 100 |
# File 'lib/async/scheduler.rb', line 98 def yield @selector.yield end |