Class: IO::Event::Selector::Select

Inherits:
Object
  • Object
show all
Defined in:
lib/io/event/selector/select.rb

Defined Under Namespace

Classes: Optional, Waiter

Constant Summary collapse

EAGAIN =
Errno::EAGAIN::Errno

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(loop) ⇒ Select

Returns a new instance of Select.



26
27
28
29
30
31
32
33
34
35
# File 'lib/io/event/selector/select.rb', line 26

def initialize(loop)
	@loop = loop
	
	@waiting = Hash.new.compare_by_identity
	
	@blocked = false
	
	@ready = Queue.new
	@interrupt = Interrupt.attach(self)
end

Instance Attribute Details

#loopObject (readonly)

Returns the value of attribute loop.



37
38
39
# File 'lib/io/event/selector/select.rb', line 37

def loop
  @loop
end

Instance Method Details

#closeObject



50
51
52
53
54
55
# File 'lib/io/event/selector/select.rb', line 50

def close
	@interrupt.close
	
	@loop = nil
	@waiting = nil
end

#io_read(fiber, io, buffer, length) ⇒ Object



154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
# File 'lib/io/event/selector/select.rb', line 154

def io_read(fiber, io, buffer, length)
	offset = 0
	
	while true
		maximum_size = buffer.size - offset
		
		case result = blocking{io.read_nonblock(maximum_size, exception: false)}
		when :wait_readable
			if length > 0
				self.io_wait(fiber, io, IO::READABLE)
			else
				return -EAGAIN
			end
		when :wait_writable
			if length > 0
				self.io_wait(fiber, io, IO::WRITABLE)
			else
				return -EAGAIN
			end
		when nil
			break
		else
			buffer.set_string(result, offset)
			
			size = result.bytesize
			offset += size
			break if size >= length
			length -= size
		end
	end
	
	return offset
end

#io_wait(fiber, io, events) ⇒ Object



143
144
145
146
147
148
149
# File 'lib/io/event/selector/select.rb', line 143

def io_wait(fiber, io, events)
	waiter = @waiting[io] = Waiter.new(fiber, events, @waiting[io])
	
	@loop.transfer
ensure
	waiter&.invalidate
end

#io_write(fiber, io, buffer, length) ⇒ Object



188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
# File 'lib/io/event/selector/select.rb', line 188

def io_write(fiber, io, buffer, length)
	offset = 0
	
	while true
		maximum_size = buffer.size - offset
		
		chunk = buffer.get_string(offset, maximum_size)
		case result = blocking{io.write_nonblock(chunk, exception: false)}
		when :wait_readable
			if length > 0
				self.io_wait(fiber, io, IO::READABLE)
			else
				return -EAGAIN
			end
		when :wait_writable
			if length > 0
				self.io_wait(fiber, io, IO::WRITABLE)
			else
				return -EAGAIN
			end
		else
			offset += result
			break if result >= length
			length -= result
		end
	end
	
	return offset
end

#process_wait(fiber, pid, flags) ⇒ Object



219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
# File 'lib/io/event/selector/select.rb', line 219

def process_wait(fiber, pid, flags)
	r, w = IO.pipe
	
	thread = Thread.new do
		Process::Status.wait(pid, flags)
	ensure
		w.close
	end
	
	self.io_wait(fiber, r, IO::READABLE)
	
	return thread.value
ensure
	r.close
	w.close
	thread&.kill
end

#push(fiber) ⇒ Object

Append the given fiber into the ready list.



97
98
99
# File 'lib/io/event/selector/select.rb', line 97

def push(fiber)
	@ready.push(fiber)
end

#raise(fiber, *arguments) ⇒ Object

Transfer to the given fiber and raise an exception. Put the current fiber into the ready list.



102
103
104
105
106
107
108
109
# File 'lib/io/event/selector/select.rb', line 102

def raise(fiber, *arguments)
	optional = Optional.new(Fiber.current)
	@ready.push(optional)
	
	fiber.raise(*arguments)
ensure
	optional.nullify
end

#ready?Boolean

Returns:

  • (Boolean)


111
112
113
# File 'lib/io/event/selector/select.rb', line 111

def ready?
	!@ready.empty?
end

#resume(fiber, *arguments) ⇒ Object

Transfer from the current fiber to the specified fiber. Put the current fiber into the ready list.



77
78
79
80
81
82
83
84
# File 'lib/io/event/selector/select.rb', line 77

def resume(fiber, *arguments)
	optional = Optional.new(Fiber.current)
	@ready.push(optional)
	
	fiber.transfer(*arguments)
ensure
	optional.nullify
end

#select(duration = nil) ⇒ Object



250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
# File 'lib/io/event/selector/select.rb', line 250

def select(duration = nil)
	if pop_ready
		# If we have popped items from the ready list, they may influence the duration calculation, so we don't delay the event loop:
		duration = 0
	end
	
	readable = Array.new
	writable = Array.new
	
	@waiting.each do |io, waiter|
		waiter.each do |fiber, events|
			if (events & IO::READABLE) > 0
				readable << io
			end
			
			if (events & IO::WRITABLE) > 0
				writable << io
			end
		end
	end
	
	@blocked = true
	duration = 0 unless @ready.empty?
	readable, writable, _ = ::IO.select(readable, writable, nil, duration)
	@blocked = false
	
	ready = Hash.new(0)
	
	readable&.each do |io|
		ready[io] |= IO::READABLE
	end
	
	writable&.each do |io|
		ready[io] |= IO::WRITABLE
	end
	
	ready.each do |io, events|
		@waiting.delete(io).transfer(events)
	end
	
	return ready.size
end

#transferObject

Transfer from the current fiber to the event loop.



72
73
74
# File 'lib/io/event/selector/select.rb', line 72

def transfer
	@loop.transfer
end

#wakeupObject

If the event loop is currently blocked,



40
41
42
43
44
45
46
47
48
# File 'lib/io/event/selector/select.rb', line 40

def wakeup
	if @blocked
		@interrupt.signal
		
		return true
	end
	
	return false
end

#yieldObject

Yield from the current fiber back to the event loop. Put the current fiber into the ready list.



87
88
89
90
91
92
93
94
# File 'lib/io/event/selector/select.rb', line 87

def yield
	optional = Optional.new(Fiber.current)
	@ready.push(optional)
	
	@loop.transfer
ensure
	optional.nullify
end