Class: Itsi::Scheduler

Inherits:
Object
  • Object
show all
Defined in:
lib/itsi/scheduler.rb,
lib/itsi/scheduler/version.rb

Defined Under Namespace

Classes: Error

Constant Summary collapse

VERSION =
"0.2.20"

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeScheduler

Returns a new instance of Scheduler.



16
17
18
19
20
21
22
23
24
25
26
# File 'lib/itsi/scheduler.rb', line 16

def initialize
  @join_waiters = {}.compare_by_identity
  @token_map = {}.compare_by_identity
  @resume_tokens = {}.compare_by_identity
  @unblocked = [[], []]
  @unblock_idx = 0
  @unblocked_mux = Mutex.new
  @resume_fiber = method(:resume_fiber).to_proc
  @resume_fiber_with_readiness = method(:resume_fiber_with_readiness).to_proc
  @resume_blocked = method(:resume_blocked).to_proc
end

Class Method Details

.resume_tokenObject



11
12
13
14
# File 'lib/itsi/scheduler.rb', line 11

def self.resume_token
  @resume_token ||= 0
  @resume_token += 1
end

Instance Method Details

#block(_, timeout, fiber = Fiber.current, token = Scheduler.resume_token) ⇒ Object



28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/itsi/scheduler.rb', line 28

def block(_, timeout, fiber = Fiber.current, token = Scheduler.resume_token)
  @join_waiters[fiber] = true

  start_timer(timeout, token) if timeout
  @resume_tokens[token] = fiber
  @token_map[fiber] = token
  Fiber.yield
ensure
  @resume_tokens.delete(token)
  @token_map.delete(fiber)
  @join_waiters.delete(fiber)
end

#closeObject

Hook invoked at the end of the thread. Will start our scheduler’s Reactor.



126
127
128
129
130
131
# File 'lib/itsi/scheduler.rb', line 126

def close
  run
ensure
  @closed ||= true
  freeze
end

#closed?Boolean

Returns:

  • (Boolean)


144
145
146
# File 'lib/itsi/scheduler.rb', line 144

def closed?
  @closed
end

#fiber(&blk) ⇒ Object

Spin up a new fiber and immediately resume it.



149
150
151
# File 'lib/itsi/scheduler.rb', line 149

def fiber(&blk)
  Fiber.new(blocking: false, &blk).tap(&:resume)
end

#io_wait(io, events, duration) ⇒ Object

Register an IO waiter. This will get resumed by our scheduler inside the call to fetch_events.



44
45
46
47
48
49
50
51
# File 'lib/itsi/scheduler.rb', line 44

def io_wait(io, events, duration)
  fiber = Fiber.current
  token = Scheduler.resume_token
  readiness = register_io_wait(io.fileno, events, duration, token)
  readiness ||= block(nil, duration, fiber, token)
  clear_timer(token)
  readiness
end

#kernel_sleep(duration) ⇒ Object



60
61
62
# File 'lib/itsi/scheduler.rb', line 60

def kernel_sleep(duration)
  block nil, duration
end

#process_wait(pid, flags) ⇒ Object

Need to defer to Process::Status rather than our extension as we don’t have a means of creating our own Process::Status.



135
136
137
138
139
140
141
142
# File 'lib/itsi/scheduler.rb', line 135

def process_wait(pid, flags)
  result = nil
  thread = Thread.new do
    result = Process::Status.wait(pid, flags)
  end
  thread.join
  result
end

#resume_blocked(fiber) ⇒ Object



90
91
92
93
94
95
96
# File 'lib/itsi/scheduler.rb', line 90

def resume_blocked(fiber)
  if (token = @token_map[fiber])
    resume_fiber(token)
  elsif fiber.alive?
    fiber.resume
  end
end

#resume_fiber(token) ⇒ Object



74
75
76
77
78
79
80
# File 'lib/itsi/scheduler.rb', line 74

def resume_fiber(token)
  if (fiber = @resume_tokens.delete(token))
    fiber.resume
  end
rescue StandardError => e
  warn "Fiber #{fiber} terminated on exception: #{e.message}"
end

#resume_fiber_with_readiness(token, readiness) ⇒ Object



82
83
84
85
86
87
88
# File 'lib/itsi/scheduler.rb', line 82

def resume_fiber_with_readiness((token, readiness))
  if (fiber = @resume_tokens.delete(token))
    fiber.resume(readiness)
  end
rescue StandardError => e
  warn "Fiber #{fiber} terminated on exception: #{e.message}"
end

#runObject

Run until no more work needs doing.



119
120
121
122
# File 'lib/itsi/scheduler.rb', line 119

def run
  tick while work?
  debug "Exit Scheduler"
end

#switch_unblock_batchObject



98
99
100
101
102
103
104
# File 'lib/itsi/scheduler.rb', line 98

def switch_unblock_batch
  @unblocked_mux.synchronize do
    current = @unblocked[@unblock_idx]
    @unblock_idx = (@unblock_idx + 1) % 2
    current
  end
end

#tickObject



64
65
66
67
68
69
70
71
72
# File 'lib/itsi/scheduler.rb', line 64

def tick
  events = fetch_due_events
  timers = fetch_due_timers
  unblocked = switch_unblock_batch
  events&.each(&@resume_fiber_with_readiness)
  unblocked.each(&@resume_blocked)
  unblocked.clear
  timers&.each(&@resume_fiber)
end

#unblock(_blocker, fiber) ⇒ Object



53
54
55
56
57
58
# File 'lib/itsi/scheduler.rb', line 53

def unblock(_blocker, fiber)
  @unblocked_mux.synchronize do
    @unblocked[@unblock_idx] << fiber
  end
  wake
end

#work?Boolean

Keep running until we’ve got no timers we’re awaiting, no pending IO, no temporary yields, no pending unblocks.

Returns:

  • (Boolean)


114
115
116
# File 'lib/itsi/scheduler.rb', line 114

def work?
  !@unblocked[@unblock_idx].empty? || !@join_waiters.empty? || has_pending_io?
end

#yieldObject

Yields upwards to the scheduler, with an intention to resume the fiber that yielded ASAP.



108
109
110
# File 'lib/itsi/scheduler.rb', line 108

def yield
  kernel_sleep(0) if work?
end