Class: Async::Scheduler

Inherits:
Node
  • Object
show all
Defined in:
lib/async/scheduler.rb

Overview

Handles scheduling of fibers. Implements the fiber scheduler interface.

Direct Known Subclasses

Reactor

Defined Under Namespace

Classes: ClosedError

Instance Attribute Summary

Attributes inherited from Node

#annotation, #children, #head, #parent, #tail

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from Node

#The parent node.=, #annotate, #backtrace, #children?, #consume, #description, #finished?, #print_hierarchy, #root, #stop, #stopped?, #transient?, #traverse

Constructor Details

#initialize(parent = nil, selector: nil) ⇒ Scheduler

Returns a new instance of Scheduler.



32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/async/scheduler.rb', line 32

def initialize(parent = nil, selector: nil)
	super(parent)
	
	@selector = selector || ::IO::Event::Selector.new(Fiber.current)
	@interrupted = false
	
	@blocked = 0
	
	@busy_time = 0.0
	@idle_time = 0.0
	
	@timers = ::Timers::Group.new
end

Class Method Details

.supported?Boolean

Whether the fiber scheduler is supported.

Returns:

  • (Boolean)


28
29
30
# File 'lib/async/scheduler.rb', line 28

def self.supported?
	true
end

Instance Method Details

#address_resolve(hostname) ⇒ Object



192
193
194
195
196
197
# File 'lib/async/scheduler.rb', line 192

def address_resolve(hostname)
	# On some platforms, hostnames may contain a device-specific suffix (e.g. %en0). We need to strip this before resolving.
	# See <https://github.com/socketry/async/issues/180> for more details.
	hostname = hostname.split("%", 2).first
	::Resolv.getaddresses(hostname)
end

#async(*arguments, **options, &block) ⇒ Object

Deprecated.

With no replacement.

Start an asynchronous task within the specified reactor. The task will be executed until the first blocking call, at which point it will yield and and this method will return.

This is the main entry point for scheduling asynchronus tasks.



384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
# File 'lib/async/scheduler.rb', line 384

def async(*arguments, **options, &block)
	Kernel::raise ClosedError if @selector.nil?
	
	task = Task.new(Task.current? || self, **options, &block)
	
	# I want to take a moment to explain the logic of this.
	# When calling an async block, we deterministically execute it until the
	# first blocking operation. We don't *have* to do this - we could schedule
	# it for later execution, but it's useful to:
	# - Fail at the point of the method call where possible.
	# - Execute determinstically where possible.
	# - Avoid scheduler overhead if no blocking operation is performed.
	task.run(*arguments)
	
	# Console.debug "Initial execution of task #{fiber} complete (#{result} -> #{fiber.alive?})..."
	return task
end

#block(blocker, timeout) ⇒ Object

Invoked when a fiber tries to perform a blocking operation which cannot continue. A corresponding call #unblock must be performed to allow this fiber to continue.



149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
# File 'lib/async/scheduler.rb', line 149

def block(blocker, timeout)
	# $stderr.puts "block(#{blocker}, #{Fiber.current}, #{timeout})"
	fiber = Fiber.current
	
	if timeout
		timer = @timers.after(timeout) do
			if fiber.alive?
				fiber.transfer(false)
			end
		end
	end
	
	begin
		@blocked += 1
		@selector.transfer
	ensure
		@blocked -= 1
	end
ensure
	timer&.cancel
end

#closeObject



84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
# File 'lib/async/scheduler.rb', line 84

def close
	# It's critical to stop all tasks. Otherwise they might be holding on to resources which are never closed/released correctly.
	until self.terminate
		self.run_once!
	end
	
	Kernel.raise "Closing scheduler with blocked operations!" if @blocked > 0
	
	# We depend on GVL for consistency:
	# @guard.synchronize do
	
	# We want `@selector = nil` to be a visible side effect from this point forward, specifically in `#interrupt` and `#unblock`. If the selector is closed, then we don't want to push any fibers to it.
	selector = @selector
	@selector = nil
	
	selector&.close
	
	# end
	
	consume
end

#closed?Boolean

Returns:

  • (Boolean)


108
109
110
# File 'lib/async/scheduler.rb', line 108

def closed?
	@selector.nil?
end

#fiberObject



402
403
404
# File 'lib/async/scheduler.rb', line 402

def fiber(...)
	return async(...).fiber
end

#interruptObject

Interrupt the event loop and cause it to exit.



118
119
120
121
# File 'lib/async/scheduler.rb', line 118

def interrupt
	@interrupted = true
	@selector&.wakeup
end

#io_read(io, buffer, length, offset = 0) ⇒ Object



232
233
234
235
236
237
238
239
240
241
242
243
244
# File 'lib/async/scheduler.rb', line 232

def io_read(io, buffer, length, offset = 0)
	fiber = Fiber.current
	
	if timeout = get_timeout(io)
		timer = @timers.after(timeout) do
			fiber.raise(::IO::TimeoutError, "Timeout while waiting for IO to become readable!")
		end
	end
	
	@selector.io_read(fiber, io, buffer, length, offset)
ensure
	timer&.cancel
end

#io_wait(io, events, timeout = nil) ⇒ Object



211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
# File 'lib/async/scheduler.rb', line 211

def io_wait(io, events, timeout = nil)
	fiber = Fiber.current
	
	if timeout
		# If an explicit timeout is specified, we expect that the user will handle it themselves:
		timer = @timers.after(timeout) do
			fiber.transfer
		end
	elsif timeout = get_timeout(io)
		# Otherwise, if we default to the io's timeout, we raise an exception:
		timer = @timers.after(timeout) do
			fiber.raise(::IO::TimeoutError, "Timeout while waiting for IO to become ready!")
		end
	end
	
	return @selector.io_wait(fiber, io, events)
ensure
	timer&.cancel
end

#io_write(io, buffer, length, offset = 0) ⇒ Object



247
248
249
250
251
252
253
254
255
256
257
258
259
# File 'lib/async/scheduler.rb', line 247

def io_write(io, buffer, length, offset = 0)
	fiber = Fiber.current
	
	if timeout = get_timeout(io)
		timer = @timers.after(timeout) do
			fiber.raise(::IO::TimeoutError, "Timeout while waiting for IO to become writable!")
		end
	end
	
	@selector.io_write(fiber, io, buffer, length, offset)
ensure
	timer&.cancel
end

#kernel_sleep(duration = nil) ⇒ Object



183
184
185
186
187
188
189
# File 'lib/async/scheduler.rb', line 183

def kernel_sleep(duration = nil)
	if duration
		self.block(nil, duration)
	else
		self.transfer
	end
end

#loadObject

Compute the scheduler load according to the busy and idle times that are updated by the run loop.



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/async/scheduler.rb', line 48

def load
	total_time = @busy_time + @idle_time
	
	# If the total time is zero, then the load is zero:
	return 0.0 if total_time.zero?
	
	# We normalize to a 1 second window:
	if total_time > 1.0
		ratio = 1.0 / total_time
		@busy_time *= ratio
		@idle_time *= ratio
		
		# We don't need to divide here as we've already normalised it to a 1s window:
		return @busy_time
	else
		return @busy_time / total_time
	end
end

#process_wait(pid, flags) ⇒ Object

Wait for the specified process ID to exit.



268
269
270
# File 'lib/async/scheduler.rb', line 268

def process_wait(pid, flags)
	return @selector.process_wait(Fiber.current, pid, flags)
end

#push(fiber) ⇒ Object

Schedule a fiber (or equivalent object) to be resumed on the next loop through the reactor.



135
136
137
# File 'lib/async/scheduler.rb', line 135

def push(fiber)
	@selector.push(fiber)
end

#raise(*arguments) ⇒ Object



139
140
141
# File 'lib/async/scheduler.rb', line 139

def raise(*arguments)
	@selector.raise(*arguments)
end

#resume(fiber, *arguments) ⇒ Object



143
144
145
# File 'lib/async/scheduler.rb', line 143

def resume(fiber, *arguments)
	@selector.resume(fiber, *arguments)
end

#runObject

Run the reactor until all tasks are finished. Proxies arguments to #async immediately before entering the loop, if a block is provided.



346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
# File 'lib/async/scheduler.rb', line 346

def run(...)
	Kernel::raise ClosedError if @selector.nil?
	
	initial_task = self.async(...) if block_given?
	
	begin
		# In theory, we could use Exception here to be a little bit safer, but we've only shown the case for SignalException to be a problem, so let's not over-engineer this.
		Thread.handle_interrupt(::SignalException => :never) do
			while true
				# If we are interrupted, we need to exit:
				break if self.interrupted?
				
				# If we are finished, we need to exit:
				break unless self.run_once
			end
		end
	rescue Interrupt
		Thread.handle_interrupt(::SignalException => :never) do
			self.stop
		end
		
		retry
	end
		
	return initial_task
ensure
	Console.debug(self) {"Exiting run-loop because #{$! ? $! : 'finished'}."}
end

#run_once(timeout = nil) ⇒ Object

Run one iteration of the event loop. Does not handle interrupts.



276
277
278
279
280
281
282
283
284
285
# File 'lib/async/scheduler.rb', line 276

def run_once(timeout = nil)
	Kernel::raise "Running scheduler on non-blocking fiber!" unless Fiber.blocking?
	
	# If we are finished, we stop the task tree and exit:
	if self.finished?
		return false
	end
	
	return run_once!(timeout)
end

#scheduler_closeObject



67
68
69
70
71
72
73
74
# File 'lib/async/scheduler.rb', line 67

def scheduler_close
	# If the execution context (thread) was handling an exception, we want to exit as quickly as possible:
	unless $!
		self.run
	end
ensure
	self.close
end

#terminateObject

Terminate the scheduler. We deliberately ignore interrupts here, as this code can be called from an interrupt, and we don’t want to be interrupted while cleaning up.



77
78
79
80
81
# File 'lib/async/scheduler.rb', line 77

def terminate
	Thread.handle_interrupt(::Interrupt => :never) do
		super
	end
end

#timeout_after(duration, exception, message, &block) ⇒ Object



422
423
424
425
426
# File 'lib/async/scheduler.rb', line 422

def timeout_after(duration, exception, message, &block)
	with_timeout(duration, exception, message) do |timer|
		yield duration
	end
end

#to_sObject



112
113
114
# File 'lib/async/scheduler.rb', line 112

def to_s
	"\#<#{self.description} #{@children&.size || 0} children (#{stopped? ? 'stopped' : 'running'})>"
end

#transferObject

Transfer from the calling fiber to the event loop.



124
125
126
# File 'lib/async/scheduler.rb', line 124

def transfer
	@selector.transfer
end

#unblock(blocker, fiber) ⇒ Object



172
173
174
175
176
177
178
179
180
# File 'lib/async/scheduler.rb', line 172

def unblock(blocker, fiber)
	# $stderr.puts "unblock(#{blocker}, #{fiber})"
	
	# This operation is protected by the GVL:
	if selector = @selector
		selector.push(fiber)
		selector.wakeup
	end
end

#with_timeout(duration, exception = TimeoutError, message = "execution expired", &block) ⇒ Object

Invoke the block, but after the specified timeout, raise TimeoutError in any currenly blocking operation. If the block runs to completion before the timeout occurs or there are no non-blocking operations after the timeout expires, the code will complete without any exception.



408
409
410
411
412
413
414
415
416
417
418
419
420
# File 'lib/async/scheduler.rb', line 408

def with_timeout(duration, exception = TimeoutError, message = "execution expired", &block)
	fiber = Fiber.current
	
	timer = @timers.after(duration) do
		if fiber.alive?
			fiber.raise(exception, message)
		end
	end
	
	yield timer
ensure
	timer.cancel if timer
end

#yieldObject

Yield the current fiber and resume it on the next iteration of the event loop.



129
130
131
# File 'lib/async/scheduler.rb', line 129

def yield
	@selector.yield
end