Class: IO::Event::Selector::Select

Inherits:
Object
  • Object
show all
Defined in:
lib/io/event/selector/select.rb

Defined Under Namespace

Classes: Optional

Constant Summary collapse

EAGAIN =
Errno::EAGAIN::Errno

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(loop) ⇒ Select

Returns a new instance of Select.



26
27
28
29
30
31
32
33
34
35
36
# File 'lib/io/event/selector/select.rb', line 26

def initialize(loop)
	@loop = loop
	
	@readable = Hash.new.compare_by_identity
	@writable = Hash.new.compare_by_identity
	
	@blocked = false
	
	@ready = Queue.new
	@interrupt = Interrupt.attach(self)
end

Instance Attribute Details

#loopObject (readonly)

Returns the value of attribute loop.



38
39
40
# File 'lib/io/event/selector/select.rb', line 38

def loop
  @loop
end

Instance Method Details

#closeObject



51
52
53
54
55
56
57
# File 'lib/io/event/selector/select.rb', line 51

def close
	@interrupt.close
	
	@loop = nil
	@readable = nil
	@writable = nil
end

#io_read(fiber, io, buffer, length) ⇒ Object



139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
# File 'lib/io/event/selector/select.rb', line 139

def io_read(fiber, io, buffer, length)
	offset = 0
	
	while true
		maximum_size = buffer.size - offset
		
		case result = blocking{io.read_nonblock(maximum_size, exception: false)}
		when :wait_readable
			if length > 0
				self.io_wait(fiber, io, IO::READABLE)
			else
				return -EAGAIN
			end
		when :wait_writable
			if length > 0
				self.io_wait(fiber, io, IO::WRITABLE)
			else
				return -EAGAIN
			end
		else
			break unless result
			
			buffer.copy(result, offset)
			
			size = result.bytesize
			offset += size
			break if size >= length
			length -= size
		end
	end
	
	return offset
end

#io_wait(fiber, io, events) ⇒ Object



117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
# File 'lib/io/event/selector/select.rb', line 117

def io_wait(fiber, io, events)
	remove_readable = remove_writable = false
	
	if (events & IO::READABLE) > 0 or (events & IO::PRIORITY) > 0
		@readable[io] = fiber
		remove_readable = true
	end
	
	if (events & IO::WRITABLE) > 0
		@writable[io] = fiber
		remove_writable = true
	end
	
	@loop.transfer
ensure
	@readable.delete(io) if remove_readable
	@writable.delete(io) if remove_writable
end

#io_write(fiber, io, buffer, length) ⇒ Object



173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
# File 'lib/io/event/selector/select.rb', line 173

def io_write(fiber, io, buffer, length)
	offset = 0
	
	while true
		maximum_size = buffer.size - offset
		
		chunk = buffer.to_str(offset, maximum_size)
		case result = blocking{io.write_nonblock(chunk, exception: false)}
		when :wait_readable
			if length > 0
				self.io_wait(fiber, io, IO::READABLE)
			else
				return -EAGAIN
			end
		when :wait_writable
			if length > 0
				self.io_wait(fiber, io, IO::WRITABLE)
			else
				return -EAGAIN
			end
		else
			offset += result
			break if result >= length
			length -= result
		end
	end
	
	return offset
end

#process_wait(fiber, pid, flags) ⇒ Object



204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
# File 'lib/io/event/selector/select.rb', line 204

def process_wait(fiber, pid, flags)
	r, w = IO.pipe
	
	thread = Thread.new do
		Process::Status.wait(pid, flags)
	ensure
		w.close
	end
	
	self.io_wait(fiber, r, IO::READABLE)
	
	return thread.value
ensure
	r.close
	w.close
	thread&.kill
end

#push(fiber) ⇒ Object

Append the given fiber into the ready list.



99
100
101
# File 'lib/io/event/selector/select.rb', line 99

def push(fiber)
	@ready.push(fiber)
end

#raise(fiber, *arguments) ⇒ Object

Transfer to the given fiber and raise an exception. Put the current fiber into the ready list.



104
105
106
107
108
109
110
111
# File 'lib/io/event/selector/select.rb', line 104

def raise(fiber, *arguments)
	optional = Optional.new(Fiber.current)
	@ready.push(optional)
	
	fiber.raise(*arguments)
ensure
	optional.nullify
end

#ready?Boolean

Returns:

  • (Boolean)


113
114
115
# File 'lib/io/event/selector/select.rb', line 113

def ready?
	!@ready.empty?
end

#resume(fiber, *arguments) ⇒ Object

Transfer from the current fiber to the specified fiber. Put the current fiber into the ready list.



79
80
81
82
83
84
85
86
# File 'lib/io/event/selector/select.rb', line 79

def resume(fiber, *arguments)
	optional = Optional.new(Fiber.current)
	@ready.push(optional)
	
	fiber.transfer(*arguments)
ensure
	optional.nullify
end

#select(duration = nil) ⇒ Object



235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
# File 'lib/io/event/selector/select.rb', line 235

def select(duration = nil)
	if pop_ready
		# If we have popped items from the ready list, they may influence the duration calculation, so we don't delay the event loop:
		duration = 0
	end
	
	@blocked = true
	duration = 0 unless @ready.empty?
	readable, writable, _ = ::IO.select(@readable.keys, @writable.keys, nil, duration)
	@blocked = false
	
	ready = Hash.new(0)
	
	readable&.each do |io|
		fiber = @readable.delete(io)
		ready[fiber] |= IO::READABLE
	end
	
	writable&.each do |io|
		fiber = @writable.delete(io)
		ready[fiber] |= IO::WRITABLE
	end
	
	ready.each do |fiber, events|
		fiber.transfer(events) if fiber.alive?
	end
	
	return ready.size
end

#transferObject

Transfer from the current fiber to the event loop.



74
75
76
# File 'lib/io/event/selector/select.rb', line 74

def transfer
	@loop.transfer
end

#wakeupObject

If the event loop is currently blocked,



41
42
43
44
45
46
47
48
49
# File 'lib/io/event/selector/select.rb', line 41

def wakeup
	if @blocked
		@interrupt.signal
		
		return true
	end
	
	return false
end

#yieldObject

Yield from the current fiber back to the event loop. Put the current fiber into the ready list.



89
90
91
92
93
94
95
96
# File 'lib/io/event/selector/select.rb', line 89

def yield
	optional = Optional.new(Fiber.current)
	@ready.push(optional)
	
	@loop.transfer
ensure
	optional.nullify
end