Class: IO::Event::Selector::Select

Inherits:
Object
  • Object
show all
Defined in:
lib/io/event/selector/select.rb

Defined Under Namespace

Classes: Optional, Waiter

Constant Summary collapse

EAGAIN =
-Errno::EAGAIN::Errno
EWOULDBLOCK =
-Errno::EWOULDBLOCK::Errno

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(loop) ⇒ Select

Returns a new instance of Select.



12
13
14
15
16
17
18
19
20
21
# File 'lib/io/event/selector/select.rb', line 12

def initialize(loop)
	@loop = loop
	
	@waiting = Hash.new.compare_by_identity
	
	@blocked = false
	
	@ready = Queue.new
	@interrupt = Interrupt.attach(self)
end

Instance Attribute Details

#loopObject (readonly)

Returns the value of attribute loop.



23
24
25
# File 'lib/io/event/selector/select.rb', line 23

def loop
  @loop
end

Instance Method Details

#again?(errno) ⇒ Boolean

Returns:

  • (Boolean)


147
148
149
# File 'lib/io/event/selector/select.rb', line 147

def again?(errno)
	errno == EAGAIN or errno == EWOULDBLOCK
end

#blocking(&block) ⇒ Object



273
274
275
276
# File 'lib/io/event/selector/select.rb', line 273

def blocking(&block)
	fiber = Fiber.new(blocking: true, &block)
	return fiber.resume(fiber)
end

#closeObject



36
37
38
39
40
41
# File 'lib/io/event/selector/select.rb', line 36

def close
	@interrupt.close
	
	@loop = nil
	@waiting = nil
end

#io_read(fiber, _io, buffer, length, offset = 0) ⇒ Object



151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
# File 'lib/io/event/selector/select.rb', line 151

def io_read(fiber, io, buffer, length, offset = 0)
	total = 0
	
	Selector.nonblock(io) do
		while true
			maximum_size = buffer.size - offset
			result = Fiber.blocking{buffer.read(io, maximum_size, offset)}
			
			if again?(result)
				if length > 0
					self.io_wait(fiber, io, IO::READABLE)
				else
					return result
				end
			elsif result < 0
				return result
			else
				total += result
				offset += result
				break if total >= length
			end
		end
	end
	
	return total
end

#io_select(readable, writable, priority, timeout) ⇒ Object



137
138
139
140
141
# File 'lib/io/event/selector/select.rb', line 137

def io_select(readable, writable, priority, timeout)
	Thread.new do
		IO.select(readable, writable, priority, timeout)
	end.value
end

#io_wait(fiber, io, events) ⇒ Object



129
130
131
132
133
134
135
# File 'lib/io/event/selector/select.rb', line 129

def io_wait(fiber, io, events)
	waiter = @waiting[io] = Waiter.new(fiber, events, @waiting[io])
	
	@loop.transfer
ensure
	waiter&.invalidate
end

#io_write(fiber, _io, buffer, length, offset = 0) ⇒ Object



178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
# File 'lib/io/event/selector/select.rb', line 178

def io_write(fiber, io, buffer, length, offset = 0)
	total = 0
	
	Selector.nonblock(io) do
		while true
			maximum_size = buffer.size - offset
			result = Fiber.blocking{buffer.write(io, maximum_size, offset)}
			
			if again?(result)
				if length > 0
					self.io_wait(fiber, io, IO::READABLE)
				else
					return result
				end
			elsif result < 0
				return result
			else
				total += result
				offset += result
				break if total >= length
			end
		end
	end
	
	return total
end

#process_wait(fiber, pid, flags) ⇒ Object



279
280
281
282
283
# File 'lib/io/event/selector/select.rb', line 279

def process_wait(fiber, pid, flags)
	Thread.new do
		Process::Status.wait(pid, flags)
	end.value
end

#push(fiber) ⇒ Object

Append the given fiber into the ready list.



83
84
85
# File 'lib/io/event/selector/select.rb', line 83

def push(fiber)
	@ready.push(fiber)
end

#raise(fiber, *arguments) ⇒ Object

Transfer to the given fiber and raise an exception. Put the current fiber into the ready list.



88
89
90
91
92
93
94
95
# File 'lib/io/event/selector/select.rb', line 88

def raise(fiber, *arguments)
	optional = Optional.new(Fiber.current)
	@ready.push(optional)
	
	fiber.raise(*arguments)
ensure
	optional.nullify
end

#ready?Boolean

Returns:

  • (Boolean)


97
98
99
# File 'lib/io/event/selector/select.rb', line 97

def ready?
	!@ready.empty?
end

#resume(fiber, *arguments) ⇒ Object

Transfer from the current fiber to the specified fiber. Put the current fiber into the ready list.



63
64
65
66
67
68
69
70
# File 'lib/io/event/selector/select.rb', line 63

def resume(fiber, *arguments)
	optional = Optional.new(Fiber.current)
	@ready.push(optional)
	
	fiber.transfer(*arguments)
ensure
	optional.nullify
end

#select(duration = nil) ⇒ Object



298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
# File 'lib/io/event/selector/select.rb', line 298

def select(duration = nil)
	if pop_ready
		# If we have popped items from the ready list, they may influence the duration calculation, so we don't delay the event loop:
		duration = 0
	end
	
	readable = Array.new
	writable = Array.new
	priority = Array.new
	
	@waiting.each do |io, waiter|
		waiter.each do |fiber, events|
			if (events & IO::READABLE) > 0
				readable << io
			end
			
			if (events & IO::WRITABLE) > 0
				writable << io
			end
			
			if (events & IO::PRIORITY) > 0
				priority << io
			end
		end
	end
	
	@blocked = true
	duration = 0 unless @ready.empty?
	readable, writable, priority = ::IO.select(readable, writable, priority, duration)
	@blocked = false
	
	ready = Hash.new(0)
	
	readable&.each do |io|
		ready[io] |= IO::READABLE
	end
	
	writable&.each do |io|
		ready[io] |= IO::WRITABLE
	end
	
	priority&.each do |io|
		ready[io] |= IO::PRIORITY
	end
	
	ready.each do |io, events|
		@waiting.delete(io).transfer(events)
	end
	
	return ready.size
end

#transferObject

Transfer from the current fiber to the event loop.



58
59
60
# File 'lib/io/event/selector/select.rb', line 58

def transfer
	@loop.transfer
end

#wakeupObject

If the event loop is currently sleeping, wake it up.



26
27
28
29
30
31
32
33
34
# File 'lib/io/event/selector/select.rb', line 26

def wakeup
	if @blocked
		@interrupt.signal
		
		return true
	end
	
	return false
end

#yieldObject

Yield from the current fiber back to the event loop. Put the current fiber into the ready list.



73
74
75
76
77
78
79
80
# File 'lib/io/event/selector/select.rb', line 73

def yield
	optional = Optional.new(Fiber.current)
	@ready.push(optional)
	
	@loop.transfer
ensure
	optional.nullify
end