Class: AsyncScheduler::Scheduler
- Inherits:
-
Object
- Object
- AsyncScheduler::Scheduler
- Includes:
- CrossThreadUsageDetector
- Defined in:
- lib/async_scheduler/scheduler.rb
Overview
This class implements Fiber::SchedulerInterface. See ruby-doc.org/core-3.1.0/Fiber/SchedulerInterface.html for details.
Instance Method Summary collapse
-
#address_resolve(hostname) ⇒ Object
Invoked by any method that performs a non-reverse DNS lookup.
-
#block(blocker, timeout = nil) ⇒ Object
Invoked by methods like Thread.join, and by Mutex, to signify that current Fiber is blocked until further notice (e.g. unblock) or until timeout has elapsed.
-
#close ⇒ Object
Called when the current thread exits.
-
#fiber(&block) ⇒ Object
Implementation of the Fiber.schedule.
-
#initialize ⇒ Scheduler
constructor
A new instance of Scheduler.
-
#io_read(io, buffer, length) ⇒ Object
Suggested implementation should try to read from io in a non-blocking manner and call io_wait if the io is not ready (which will yield control to other fibers).
-
#io_wait(io, events, _timeout) ⇒ Object
Suggested implementation should register which Fiber is waiting for which resources and immediately calling Fiber.yield to pass control to other fibers.
-
#io_write(io, buffer, length) ⇒ Object
Specifying a length of 0 is valid and means try writing at least once, as much data as possible.
-
#kernel_sleep(duration = nil) ⇒ Object
Invoked by Kernel#sleep and Mutex#sleep and is expected to provide an implementation of sleeping in a non-blocking way.
- #soonest_timeout ⇒ Object
-
#timeout_after(duration, exception_class, *exception_arguments) {|duration| ... } ⇒ Object
Invoked by Timeout.timeout to execute the given block within the given duration.
-
#unblock(blocker, fiber) ⇒ Object
Invoked to wake up Fiber previously blocked with block (for example, Mutex#lock calls block and Mutex#unlock calls unblock).
Methods included from CrossThreadUsageDetector
#set_belonging_thread!, #validate_used_in_original_thread!
Constructor Details
#initialize ⇒ Scheduler
Returns a new instance of Scheduler.
11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
# File 'lib/async_scheduler/scheduler.rb', line 11 def initialize set_belonging_thread! # (key, value) = (Fiber object, timeout<not nil>) @waitings = {} # (key, value) = (blocking io, Fiber object) @input_waitings = {} @output_waitings = {} # Fibers which are blocking and whose timeouts are not determined. # e.g. Fiber which includes sleep() @blockings = Set.new() # (key, value) = (socket, Hash{:blocked_fiber => <Fiber object>, :timeout => <timeout>}) @blocking_sockets = {} # NOTE: When either of the sockets(value) is ready, the fiber(key) can be resumed. @fiber_to_all_blocker_sockets = Hash.new{|h, fiber| h[fiber] = Set.new} end |
Instance Method Details
#address_resolve(hostname) ⇒ Object
Invoked by any method that performs a non-reverse DNS lookup. (e.g. Addrinfo.getaddrinfo) The method is expected to return an array of strings corresponding to ip addresses the hostname is resolved to, or nil if it can not be resolved.
296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 |
# File 'lib/async_scheduler/scheduler.rb', line 296 def address_resolve(hostname) # NOTE: # Asynchronous DNS lookup is slower than sequential DNS lookup in a single thread in my experiment. # Remove #address_resolve when this scheduler is used in a performance critical application. # Run $ `bundle exec rspec spec/blockings/address_resolve_spec.rb` to confirm it. validate_used_in_original_thread! fiber = ::ResolvFiber.getaddresses_fiber(hostname) # Fiber.yield inside of this fiber is located in the loop and may be called multiple times. # So here in the caller, the fiber has to be resumed multiple times till the fiber becomes terminated. loop do result = fiber.resume return result unless fiber.alive? socks, timeout = result # In my experiment, socks here are: # - socket to connect to the DNS server which has IPv4 address # - socket to connect to the DNS server which has IPv6 address # When either of these is ready, DNS resolution is done. socks.each do |sock| # Fiber.current is blocked by multiple sockets in this way: # @blocking_sockets = { # first_socket: { blocked_fiber: Fiber.current, timeout: 1111 }, # second_socket: { blocked_fiber: Fiber.current, timeout: 1111 }, # } # If first_socket is ready, both of `first_socket` and `second_socket` must be removed from @blocking_sockets before the fiber gets resumed. # (@fiber_to_all_blocker_sockets is used to realize this.) # Otherwise, when second_socket is ready, `Fiber.current` will be resumed again unexpectedly. # Same thing can be said if second_socket is ready first. @blocking_sockets[sock] = { blocked_fiber: Fiber.current, timeout: timeout, } @fiber_to_all_blocker_sockets[Fiber.current] << sock end Fiber.yield end end |
#block(blocker, timeout = nil) ⇒ Object
Invoked by methods like Thread.join, and by Mutex, to signify that current Fiber is blocked until further notice (e.g. unblock) or until timeout has elapsed. blocker is what we are waiting on, informational only (for debugging and logging). There are no guarantee about its value. Expected to return boolean, specifying whether the blocking operation was successful or not.
42 43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/async_scheduler/scheduler.rb', line 42 def block(blocker, timeout = nil) validate_used_in_original_thread! # TODO: Make use of blocker. if timeout @waitings[Fiber.current] = timeout else @blockings << Fiber.current end true end |
#close ⇒ Object
Called when the current thread exits. The scheduler is expected to implement this method in order to allow all waiting fibers to finalize their execution. The suggested pattern is to implement the main event loop in the close method.
105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 |
# File 'lib/async_scheduler/scheduler.rb', line 105 def close validate_used_in_original_thread! while !@waitings.empty? || !@blockings.empty? || !@input_waitings.empty? || !@output_waitings.empty? || !@blocking_sockets.empty? # For blocking I/Os... while !@input_waitings.empty? || !@output_waitings.empty? || !@blocking_sockets.empty? soonest_timeout_ = self.soonest_timeout select_duration = if soonest_timeout_.nil? nil else duration = soonest_timeout_ - Process.clock_gettime(Process::CLOCK_MONOTONIC) # duration here should be very close to 0 even if it is negative. [duration, 0].max end # NOTE: IO.select will keep blocking until timeout even if any new event is added to @waitings. # TODO: Don't wait for the input ready when the corresponding fiber gets terminated, and when it is the only one in @input_waitings. # TODO: Don't wait for the output ready when the corresponding fiber gets terminated, and when it is the only one in @output_waitings. inputs_ready, outputs_ready = IO.select( @input_waitings.keys + @blocking_sockets.keys, @output_waitings.keys, [], select_duration ) inputs_ready&.each do |input| if @input_waitings[input] fiber_non_blocking = @input_waitings.delete(input) fiber_non_blocking.resume if fiber_non_blocking.alive? elsif @blocking_sockets[input] fiber = @blocking_sockets.delete(input).fetch(:blocked_fiber) # ref. comment in #address_resolve @fiber_to_all_blocker_sockets.fetch(fiber).each do |socket| @blocking_sockets.delete(socket) end fiber.resume else raise end end current_clock_monotonic_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) timeout_sockets = @blocking_sockets.select{|socket, blocked| blocked.fetch(:timeout) && (blocked.fetch(:timeout) <= current_clock_monotonic_time)}.keys # NOTE: timeout_sockets is nil when there is no rejected element. @blocking_sockets.reject!{|socket| timeout_sockets&.include? socket} timeout_sockets&.each do |socket| @fiber_to_all_blocker_sockets.fetch(fiber).delete(socket) fiber.resume if @fiber_to_all_blocker_sockets.fetch(fiber).empty? && fiber.alive? end outputs_ready&.each do |output| fiber_non_blocking = @output_waitings.delete(output) fiber_non_blocking.resume if fiber_non_blocking.alive? end end unless @waitings.empty? # TODO: Use a min heap for @waitings resumable_fibers = @waitings.select{|_fiber, timeout| timeout <= Process.clock_gettime(Process::CLOCK_MONOTONIC)} .map{|fiber, _timeout| fiber} .to_set resumable_fibers.each{|fiber| fiber.resume if fiber.alive?} @waitings.reject!{|fiber, _timeout| resumable_fibers.include? fiber} end # Unfortunately, current scheduler is unfair to @blockings. Even if any of @blockings is ready, current scheduler has no way to notice it. @blockings.select!{|fiber| fiber.alive?} end end |
#fiber(&block) ⇒ Object
Implementation of the Fiber.schedule. The method is expected to immediately run the given block of code in a separate non-blocking fiber, and to return that Fiber.
31 32 33 34 35 36 37 |
# File 'lib/async_scheduler/scheduler.rb', line 31 def fiber(&block) validate_used_in_original_thread! fiber = Fiber.new(blocking: false, &block) fiber.resume fiber end |
#io_read(io, buffer, length) ⇒ Object
Suggested implementation should try to read from io in a non-blocking manner and call io_wait if the io is not ready (which will yield control to other fibers). See IO::Buffer for an interface available to return data. Expected to return number of bytes read, or, in case of an error, -errno (negated number corresponding to system’s error code).
220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 |
# File 'lib/async_scheduler/scheduler.rb', line 220 def io_read(io, buffer, length) # return length or -errno validate_used_in_original_thread! read_string = "" offset = 0 while offset < length || length == 0 read_nonblock = Fiber.new(blocking: true) do # AsyncScheduler::Scheduler#io_read is hooked to IO#read_nonblock. # To avoid an infinite call loop, IO#read_nonblock is called inside a Fiber whose blocking=true. # ref. https://docs.ruby-lang.org/ja/latest/method/IO/i/read_nonblock.html io.read_nonblock(buffer.size-offset, read_string, exception: false) end begin # This fiber is resumed only here. result = read_nonblock.resume rescue SystemCallError => e return -e.errno end case result when :wait_readable io_wait(io, IO::READABLE, nil) when nil # when reaching EOF # TODO: Investigate if it is expected to break here. break else offset += buffer.set_string(read_string, offset) # this does not work with `#set_string(result)` break if length == 0 end end return offset end |
#io_wait(io, events, _timeout) ⇒ Object
Suggested implementation should register which Fiber is waiting for which resources and immediately calling Fiber.yield to pass control to other fibers. Then, in the close method, the scheduler might dispatch all the I/O resources to fibers waiting for it. Expected to return the subset of events that are ready immediately.
195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 |
# File 'lib/async_scheduler/scheduler.rb', line 195 def io_wait(io, events, _timeout) validate_used_in_original_thread! # TODO: use timeout parameter # TODO?: Expected to return the subset of events that are ready immediately. if events & IO::READABLE == IO::READABLE @input_waitings[io] = Fiber.current end if events & IO::WRITABLE == IO::WRITABLE @output_waitings[io] = Fiber.current end Fiber.yield end |
#io_write(io, buffer, length) ⇒ Object
Specifying a length of 0 is valid and means try writing at least once, as much data as possible. Suggested implementation should try to write to io in a non-blocking manner and call io_wait if the io is not ready (which will yield control to other fibers). See IO::Buffer for an interface available to get data from buffer efficiently. Expected to return number of bytes written, or, in case of an error, -errno (negated number corresponding to system’s error code).
263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 |
# File 'lib/async_scheduler/scheduler.rb', line 263 def io_write(io, buffer, length) # returns: written length or -errnoclick to toggle source validate_used_in_original_thread! offset = 0 while offset < length || length == 0 write_nonblock = Fiber.new(blocking: true) do # TODO: Investigate if this #write_nonblock method call should be in a non-blocking fiber. # IO#read_nonblock is hooked to Scheduler#io_wait, so it has to be wrapped. # If IO#read_nonblock is hooked to Scheduler#io_read, this method call has to be wrapped too. # ref. https://docs.ruby-lang.org/ja/latest/class/IO.html#I_WRITE_NONBLOCK io.write_nonblock(buffer.get_string(offset), exception: false) end begin result = write_nonblock.resume rescue SystemCallError => e return -e.errno end case result when :wait_writable io_wait(io, IO::WRITABLE, nil) else offset += result break if length == 0 # Specification says it tries writing at least once if length == 0 end end return offset end |
#kernel_sleep(duration = nil) ⇒ Object
Invoked by Kernel#sleep and Mutex#sleep and is expected to provide an implementation of sleeping in a non-blocking way. Implementation might register the current fiber in some list of “which fiber wait until what moment”, call Fiber.yield to pass control, and then in close resume the fibers whose wait period has elapsed.
70 71 72 73 74 75 76 77 78 79 |
# File 'lib/async_scheduler/scheduler.rb', line 70 def kernel_sleep(duration = nil) validate_used_in_original_thread! timeout = duration ? Process.clock_gettime(Process::CLOCK_MONOTONIC) + duration : nil if block(:kernel_sleep, timeout) Fiber.yield else raise RuntimeError.new("Failed to sleep") end end |
#soonest_timeout ⇒ Object
177 178 179 180 181 182 183 184 185 186 |
# File 'lib/async_scheduler/scheduler.rb', line 177 def soonest_timeout waitings_earliest_timeout = @waitings.empty? ? nil : @waitings.min_by{|fiber, timeout| timeout}[1] blocking_socket_earliest_timeout = @blocking_sockets.select{|_socket, blocked| blocked.fetch(:timeout)}.values.min_by{|blocked| blocked.fetch(:timeout)}[0]&.fetch(:timeout) if waitings_earliest_timeout && blocking_socket_earliest_timeout return [waitings_earliest_timeout, blocking_socket_earliest_timeout].min end waitings_earliest_timeout || blocking_socket_earliest_timeout end |
#timeout_after(duration, exception_class, *exception_arguments) {|duration| ... } ⇒ Object
Invoked by Timeout.timeout to execute the given block within the given duration. It can also be invoked directly by the scheduler or user code.
This implementation will only interrupt non-blocking operations. If the block is executed successfully, its result will be returned.
86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 |
# File 'lib/async_scheduler/scheduler.rb', line 86 def timeout_after(duration, exception_class, *exception_arguments, &block) # → result of block validate_used_in_original_thread! current_fiber = Fiber.current if duration self.fiber() do sleep(duration) if current_fiber.alive? current_fiber.raise(exception_class, *exception_arguments) end end end yield duration end |
#unblock(blocker, fiber) ⇒ Object
Invoked to wake up Fiber previously blocked with block (for example, Mutex#lock calls block and Mutex#unlock calls unblock). The scheduler should use the fiber parameter to understand which fiber is unblocked. blocker is what was awaited for, but it is informational only (for debugging and logging), and it is not guaranteed to be the same value as the blocker for block.
59 60 61 62 63 64 65 |
# File 'lib/async_scheduler/scheduler.rb', line 59 def unblock(blocker, fiber) validate_used_in_original_thread! # TODO: Make use of blocker. @blockings.delete fiber fiber.resume end |