Class: IO::Event::Selector::Select

Inherits:
Object
  • Object
show all
Defined in:
lib/io/event/selector/select.rb

Defined Under Namespace

Classes: Optional, Waiter

Constant Summary collapse

EAGAIN =
-Errno::EAGAIN::Errno
EWOULDBLOCK =
-Errno::EWOULDBLOCK::Errno

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(loop) ⇒ Select

Returns a new instance of Select.



13
14
15
16
17
18
19
20
21
22
23
24
# File 'lib/io/event/selector/select.rb', line 13

def initialize(loop)
	@loop = loop
	
	@waiting = Hash.new.compare_by_identity
	
	@blocked = false
	
	@ready = Queue.new
	@interrupt = Interrupt.attach(self)
	
	@idle_duration = 0.0
end

Instance Attribute Details

#idle_durationObject (readonly)

This is the amount of time the event loop was idle during the last select call.



29
30
31
# File 'lib/io/event/selector/select.rb', line 29

def idle_duration
  @idle_duration
end

#loopObject (readonly)

Returns the value of attribute loop.



26
27
28
# File 'lib/io/event/selector/select.rb', line 26

def loop
  @loop
end

Instance Method Details

#again?(errno) ⇒ Boolean

Returns:

  • (Boolean)


164
165
166
# File 'lib/io/event/selector/select.rb', line 164

def again?(errno)
	errno == EAGAIN or errno == EWOULDBLOCK
end

#blocking(&block) ⇒ Object



369
370
371
372
# File 'lib/io/event/selector/select.rb', line 369

def blocking(&block)
	fiber = Fiber.new(blocking: true, &block)
	return fiber.resume(fiber)
end

#closeObject



42
43
44
45
46
47
# File 'lib/io/event/selector/select.rb', line 42

def close
	@interrupt.close
	
	@loop = nil
	@waiting = nil
end

#io_read(fiber, _io, buffer, length, offset = 0) ⇒ Object

Ruby <= 3.1, limited IO::Buffer support.



173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
# File 'lib/io/event/selector/select.rb', line 173

def io_read(fiber, io, buffer, length, offset = 0)
	total = 0
	
	Selector.nonblock(io) do
		while true
			result = Fiber.blocking{buffer.read(io, 0, offset)}
			
			if result < 0
				if again?(result)
					self.io_wait(fiber, io, IO::READABLE)
				else
					return result
				end
			elsif result == 0
				break
			else
				total += result
				break if total >= length
				offset += result
			end
		end
	end
	
	return total
end

#io_select(readable, writable, priority, timeout) ⇒ Object



155
156
157
158
159
# File 'lib/io/event/selector/select.rb', line 155

def io_select(readable, writable, priority, timeout)
	Thread.new do
		IO.select(readable, writable, priority, timeout)
	end.value
end

#io_wait(fiber, io, events) ⇒ Object



147
148
149
150
151
152
153
# File 'lib/io/event/selector/select.rb', line 147

def io_wait(fiber, io, events)
	waiter = @waiting[io] = Waiter.new(fiber, events, @waiting[io])
	
	@loop.transfer
ensure
	waiter&.invalidate
end

#io_write(fiber, _io, buffer, length, offset = 0) ⇒ Object



201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
# File 'lib/io/event/selector/select.rb', line 201

def io_write(fiber, io, buffer, length, offset = 0)
	total = 0
	
	Selector.nonblock(io) do
		while true
			result = Fiber.blocking{buffer.write(io, 0, offset)}
			
			if result < 0
				if again?(result)
					self.io_wait(fiber, io, IO::READABLE)
				else
					return result
				end
			elsif result == 0
				break result
			else
				total += result
				break if total >= length
				offset += result
			end
		end
	end
	
	return total
end

#process_wait(fiber, pid, flags) ⇒ Object



375
376
377
378
379
# File 'lib/io/event/selector/select.rb', line 375

def process_wait(fiber, pid, flags)
	Thread.new do
		Process::Status.wait(pid, flags)
	end.value
end

#push(fiber) ⇒ Object

Append the given fiber into the ready list.



89
90
91
# File 'lib/io/event/selector/select.rb', line 89

def push(fiber)
	@ready.push(fiber)
end

#raise(fiber, *arguments) ⇒ Object

Transfer to the given fiber and raise an exception. Put the current fiber into the ready list.



94
95
96
97
98
99
100
101
# File 'lib/io/event/selector/select.rb', line 94

def raise(fiber, *arguments)
	optional = Optional.new(Fiber.current)
	@ready.push(optional)
	
	fiber.raise(*arguments)
ensure
	optional.nullify
end

#ready?Boolean

Returns:

  • (Boolean)


103
104
105
# File 'lib/io/event/selector/select.rb', line 103

def ready?
	!@ready.empty?
end

#resume(fiber, *arguments) ⇒ Object

Transfer from the current fiber to the specified fiber. Put the current fiber into the ready list.



69
70
71
72
73
74
75
76
# File 'lib/io/event/selector/select.rb', line 69

def resume(fiber, *arguments)
	optional = Optional.new(Fiber.current)
	@ready.push(optional)
	
	fiber.transfer(*arguments)
ensure
	optional.nullify
end

#select(duration = nil) ⇒ Object



394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
# File 'lib/io/event/selector/select.rb', line 394

def select(duration = nil)
	if pop_ready
		# If we have popped items from the ready list, they may influence the duration calculation, so we don't delay the event loop:
		duration = 0
	end
	
	readable = Array.new
	writable = Array.new
	priority = Array.new
	
	@waiting.each do |io, waiter|
		waiter.each do |fiber, events|
			if (events & IO::READABLE) > 0
				readable << io
			end
			
			if (events & IO::WRITABLE) > 0
				writable << io
			end
			
			if (events & IO::PRIORITY) > 0
				priority << io
			end
		end
	end
	
	duration = 0 unless @ready.empty?
	error = nil
	
	if duration && duration > 0.0
		start_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)
	else
		@idle_duration = 0.0
	end
	
	# We need to handle interrupts on blocking IO. Every other implementation uses EINTR, but that doesn't work with `::IO.select` as it will retry the call on EINTR.
	Thread.handle_interrupt(::Exception => :on_blocking) do
		@blocked = true
		readable, writable, priority = ::IO.select(readable, writable, priority, duration)
	rescue ::Exception => error
		# Requeue below...
	ensure
		@blocked = false
		if start_time
			end_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)
			@idle_duration = end_time - start_time
		end
	end
	
	if error
		# Requeue the error into the pending exception queue:
		Thread.current.raise(error)
		return 0
	end
	
	ready = Hash.new(0).compare_by_identity
	
	readable&.each do |io|
		ready[io] |= IO::READABLE
	end
	
	writable&.each do |io|
		ready[io] |= IO::WRITABLE
	end
	
	priority&.each do |io|
		ready[io] |= IO::PRIORITY
	end
	
	ready.each do |io, events|
		@waiting.delete(io).dispatch(events) do |waiter|
			# Re-schedule the waiting IO:
			waiter.tail = @waiting[io]
			@waiting[io] = waiter
		end
	end
	
	return ready.size
end

#transferObject

Transfer from the current fiber to the event loop.



64
65
66
# File 'lib/io/event/selector/select.rb', line 64

def transfer
	@loop.transfer
end

#wakeupObject

If the event loop is currently sleeping, wake it up.



32
33
34
35
36
37
38
39
40
# File 'lib/io/event/selector/select.rb', line 32

def wakeup
	if @blocked
		@interrupt.signal
		
		return true
	end
	
	return false
end

#yieldObject

Yield from the current fiber back to the event loop. Put the current fiber into the ready list.



79
80
81
82
83
84
85
86
# File 'lib/io/event/selector/select.rb', line 79

def yield
	optional = Optional.new(Fiber.current)
	@ready.push(optional)
	
	@loop.transfer
ensure
	optional.nullify
end