Class: Core::Async::Scheduler

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Includes:
Is::Inspectable
Defined in:
lib/core/async/scheduler.rb

Overview

public

The fiber scheduler.

Direct Known Subclasses

Reactor

Instance Method Summary collapse

Constructor Details

#initializeScheduler

Returns a new instance of Scheduler.



39
40
41
42
43
44
45
# File 'lib/core/async/scheduler.rb', line 39

def initialize
  @selector = IO::Event::Selector.new(Fiber.current)
  @timers = Timers::Group.new
  @closed = false
  @running = false
  @fibers = {}
end

Instance Method Details

#block(blocker, timeout = nil) ⇒ Object

Scheduler Interface



143
144
145
146
147
148
149
150
151
# File 'lib/core/async/scheduler.rb', line 143

def block(blocker, timeout = nil)
  if timeout
    timer = create_timer(timeout)
  end

  @selector.transfer
ensure
  timer&.cancel
end

#cancel(fiber = ) ⇒ Object

public

Cancel the given fiber.



117
118
119
# File 'lib/core/async/scheduler.rb', line 117

def cancel(fiber = Thread.current[:__corerb_async_current_fiber__])
  self.raise(fiber, Cancel) if fiber
end

#closeObject



153
154
155
156
157
158
159
160
161
# File 'lib/core/async/scheduler.rb', line 153

def close
  run
ensure
  unless @closed
    @closed = true

    Fiber.set_scheduler(nil)
  end
end

#fiber(&block) ⇒ Object



163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
# File 'lib/core/async/scheduler.rb', line 163

def fiber(&block)
  filament = Filament.new

  fiber = Fiber.new(blocking: false) {
    begin
      Thread.current[:__corerb_async_current_fiber__] = filament
      filament.object = fiber
      result = block.call(filament)
    rescue Cancel
      result = nil
    rescue => error
      result = Failure.new(error)
    ensure
      @fibers.delete(fiber)
      filament.resolve(result)
    end
  }

  @fibers[fiber] = filament

  if (parent = @fibers[Fiber.current])
    parent.add_child(filament)
  end

  @selector.resume(fiber)

  fiber
end

#io_wait(io, events, timeout = nil) ⇒ Object



192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
# File 'lib/core/async/scheduler.rb', line 192

def io_wait(io, events, timeout = nil)
  fiber = Fiber.current

  if timeout
    timer = @timers.after(timeout) {
      fiber.raise(Timeout)
    }
  end

  @selector.io_wait(fiber, io, events)
rescue Timeout
  false
ensure
  timer&.cancel
end

#kernel_sleep(duration = nil) ⇒ Object



208
209
210
211
212
213
214
215
216
# File 'lib/core/async/scheduler.rb', line 208

def kernel_sleep(duration = nil)
  if duration
    timer = create_timer(duration)
  end

  @selector.transfer
ensure
  timer&.cancel
end

#observeObject

public

Run until there’s no more work to do, yielding before each tick.



67
68
69
70
71
72
73
74
75
76
77
# File 'lib/core/async/scheduler.rb', line 67

def observe
  @running = true

  result = nil
  until result == :finished
    yield self
    result = tick
  end
ensure
  @running = false
end

#process_waitObject



218
219
220
# File 'lib/core/async/scheduler.rb', line 218

def process_wait(...)
  @selector.process_wait(Fiber.current, ...)
end

#runObject

public

Run until there’s no more work to do.



49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/core/async/scheduler.rb', line 49

def run
  @running = true

  yield self if block_given?

  Thread.handle_interrupt(Interrupt => :never) do
    result = nil

    until Thread.pending_interrupt? || result == :finished
      result = tick
    end
  end
ensure
  @running = false
end

#running?Boolean

public

Returns ‘true` if the scheduler is running.

Returns:

  • (Boolean)


81
82
83
# File 'lib/core/async/scheduler.rb', line 81

def running?
  @running == true
end

#stopObject

public

Stop the scheduler, canceling each scheduled fiber.



107
108
109
110
111
112
113
# File 'lib/core/async/scheduler.rb', line 107

def stop
  signal = Cancel.new

  @fibers.each_key do |fiber|
    @selector.raise(fiber, signal)
  end
end

#tickObject

public

Run one tick.



87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
# File 'lib/core/async/scheduler.rb', line 87

def tick
  interval = @timers.wait_interval

  if interval.nil?
    if @fibers.empty?
      return :finished
    end
  elsif interval < 0
    interval = 0
  end

  @selector.select(interval)

  @timers.fire
rescue Errno::EINTR
  # noop
end

#timeout(seconds) ⇒ Object

public

Yields to the given block, timing out after the given seconds.



123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
# File 'lib/core/async/scheduler.rb', line 123

def timeout(seconds)
  if seconds && seconds > 0
    fiber = Fiber.current

    timer = @timers.after(seconds) {
      if fiber.alive?
        fiber.raise(Timeout)
      end
    }
  end

  yield
ensure
  timer&.cancel
end

#unblock(blocker, fiber) ⇒ Object



222
223
224
# File 'lib/core/async/scheduler.rb', line 222

def unblock(blocker, fiber)
  @selector.push(fiber)
end