Class: Itsi::Scheduler
- Inherits:
-
Object
- Object
- Itsi::Scheduler
- Defined in:
- lib/itsi/scheduler.rb,
lib/itsi/scheduler/version.rb
Defined Under Namespace
Classes: Error
Constant Summary collapse
- VERSION =
"0.2.20"
Class Method Summary collapse
Instance Method Summary collapse
- #block(_, timeout, fiber = Fiber.current, token = Scheduler.resume_token) ⇒ Object
-
#close ⇒ Object
Hook invoked at the end of the thread.
- #closed? ⇒ Boolean
-
#fiber(&blk) ⇒ Object
Spin up a new fiber and immediately resume it.
-
#initialize ⇒ Scheduler
constructor
A new instance of Scheduler.
-
#io_wait(io, events, duration) ⇒ Object
Register an IO waiter.
- #kernel_sleep(duration) ⇒ Object
-
#process_wait(pid, flags) ⇒ Object
Need to defer to Process::Status rather than our extension as we don’t have a means of creating our own Process::Status.
- #resume_blocked(fiber) ⇒ Object
- #resume_fiber(token) ⇒ Object
- #resume_fiber_with_readiness(token, readiness) ⇒ Object
-
#run ⇒ Object
Run until no more work needs doing.
- #switch_unblock_batch ⇒ Object
- #tick ⇒ Object
- #unblock(_blocker, fiber) ⇒ Object
-
#work? ⇒ Boolean
Keep running until we’ve got no timers we’re awaiting, no pending IO, no temporary yields, no pending unblocks.
-
#yield ⇒ Object
Yields upwards to the scheduler, with an intention to resume the fiber that yielded ASAP.
Constructor Details
#initialize ⇒ Scheduler
Returns a new instance of Scheduler.
16 17 18 19 20 21 22 23 24 25 26 |
# File 'lib/itsi/scheduler.rb', line 16 def initialize @join_waiters = {}.compare_by_identity @token_map = {}.compare_by_identity @resume_tokens = {}.compare_by_identity @unblocked = [[], []] @unblock_idx = 0 @unblocked_mux = Mutex.new @resume_fiber = method(:resume_fiber).to_proc @resume_fiber_with_readiness = method(:resume_fiber_with_readiness).to_proc @resume_blocked = method(:resume_blocked).to_proc end |
Class Method Details
.resume_token ⇒ Object
11 12 13 14 |
# File 'lib/itsi/scheduler.rb', line 11 def self.resume_token @resume_token ||= 0 @resume_token += 1 end |
Instance Method Details
#block(_, timeout, fiber = Fiber.current, token = Scheduler.resume_token) ⇒ Object
28 29 30 31 32 33 34 35 36 37 38 39 |
# File 'lib/itsi/scheduler.rb', line 28 def block(_, timeout, fiber = Fiber.current, token = Scheduler.resume_token) @join_waiters[fiber] = true start_timer(timeout, token) if timeout @resume_tokens[token] = fiber @token_map[fiber] = token Fiber.yield ensure @resume_tokens.delete(token) @token_map.delete(fiber) @join_waiters.delete(fiber) end |
#close ⇒ Object
Hook invoked at the end of the thread. Will start our scheduler’s Reactor.
126 127 128 129 130 131 |
# File 'lib/itsi/scheduler.rb', line 126 def close run ensure @closed ||= true freeze end |
#closed? ⇒ Boolean
144 145 146 |
# File 'lib/itsi/scheduler.rb', line 144 def closed? @closed end |
#fiber(&blk) ⇒ Object
Spin up a new fiber and immediately resume it.
149 150 151 |
# File 'lib/itsi/scheduler.rb', line 149 def fiber(&blk) Fiber.new(blocking: false, &blk).tap(&:resume) end |
#io_wait(io, events, duration) ⇒ Object
Register an IO waiter. This will get resumed by our scheduler inside the call to fetch_events.
44 45 46 47 48 49 50 51 |
# File 'lib/itsi/scheduler.rb', line 44 def io_wait(io, events, duration) fiber = Fiber.current token = Scheduler.resume_token readiness = register_io_wait(io.fileno, events, duration, token) readiness ||= block(nil, duration, fiber, token) clear_timer(token) readiness end |
#kernel_sleep(duration) ⇒ Object
60 61 62 |
# File 'lib/itsi/scheduler.rb', line 60 def kernel_sleep(duration) block nil, duration end |
#process_wait(pid, flags) ⇒ Object
Need to defer to Process::Status rather than our extension as we don’t have a means of creating our own Process::Status.
135 136 137 138 139 140 141 142 |
# File 'lib/itsi/scheduler.rb', line 135 def process_wait(pid, flags) result = nil thread = Thread.new do result = Process::Status.wait(pid, flags) end thread.join result end |
#resume_blocked(fiber) ⇒ Object
90 91 92 93 94 95 96 |
# File 'lib/itsi/scheduler.rb', line 90 def resume_blocked(fiber) if (token = @token_map[fiber]) resume_fiber(token) elsif fiber.alive? fiber.resume end end |
#resume_fiber(token) ⇒ Object
74 75 76 77 78 79 80 |
# File 'lib/itsi/scheduler.rb', line 74 def resume_fiber(token) if (fiber = @resume_tokens.delete(token)) fiber.resume end rescue StandardError => e warn "Fiber #{fiber} terminated on exception: #{e.message}" end |
#resume_fiber_with_readiness(token, readiness) ⇒ Object
82 83 84 85 86 87 88 |
# File 'lib/itsi/scheduler.rb', line 82 def resume_fiber_with_readiness((token, readiness)) if (fiber = @resume_tokens.delete(token)) fiber.resume(readiness) end rescue StandardError => e warn "Fiber #{fiber} terminated on exception: #{e.message}" end |
#run ⇒ Object
Run until no more work needs doing.
119 120 121 122 |
# File 'lib/itsi/scheduler.rb', line 119 def run tick while work? debug "Exit Scheduler" end |
#switch_unblock_batch ⇒ Object
98 99 100 101 102 103 104 |
# File 'lib/itsi/scheduler.rb', line 98 def switch_unblock_batch @unblocked_mux.synchronize do current = @unblocked[@unblock_idx] @unblock_idx = (@unblock_idx + 1) % 2 current end end |
#tick ⇒ Object
64 65 66 67 68 69 70 71 72 |
# File 'lib/itsi/scheduler.rb', line 64 def tick events = fetch_due_events timers = fetch_due_timers unblocked = switch_unblock_batch events&.each(&@resume_fiber_with_readiness) unblocked.each(&@resume_blocked) unblocked.clear timers&.each(&@resume_fiber) end |
#unblock(_blocker, fiber) ⇒ Object
53 54 55 56 57 58 |
# File 'lib/itsi/scheduler.rb', line 53 def unblock(_blocker, fiber) @unblocked_mux.synchronize do @unblocked[@unblock_idx] << fiber end wake end |
#work? ⇒ Boolean
Keep running until we’ve got no timers we’re awaiting, no pending IO, no temporary yields, no pending unblocks.
114 115 116 |
# File 'lib/itsi/scheduler.rb', line 114 def work? !@unblocked[@unblock_idx].empty? || !@join_waiters.empty? || has_pending_io? end |
#yield ⇒ Object
Yields upwards to the scheduler, with an intention to resume the fiber that yielded ASAP.
108 109 110 |
# File 'lib/itsi/scheduler.rb', line 108 def yield kernel_sleep(0) if work? end |