Class: IO::Event::Selector::Select

Inherits:
Object
  • Object
show all
Defined in:
lib/io/event/selector/select.rb

Defined Under Namespace

Classes: Optional

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(loop) ⇒ Select

Returns a new instance of Select.



26
27
28
29
30
31
32
33
34
35
36
# File 'lib/io/event/selector/select.rb', line 26

def initialize(loop)
	@loop = loop
	
	@readable = Hash.new.compare_by_identity
	@writable = Hash.new.compare_by_identity
	
	@blocked = false
	
	@ready = Queue.new
	@interrupt = Interrupt.attach(self)
end

Instance Attribute Details

#loopObject (readonly)

Returns the value of attribute loop.



38
39
40
# File 'lib/io/event/selector/select.rb', line 38

def loop
  @loop
end

Instance Method Details

#closeObject



51
52
53
54
55
56
57
# File 'lib/io/event/selector/select.rb', line 51

def close
	@interrupt.close
	
	@loop = nil
	@readable = nil
	@writable = nil
end

#io_read(fiber, io, buffer, length) ⇒ Object



137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
# File 'lib/io/event/selector/select.rb', line 137

def io_read(fiber, io, buffer, length)
	offset = 0
	
	while length > 0
		# The maximum size we can read:
		maximum_size = buffer.size - offset
		
		case result = io.read_nonblock(maximum_size, exception: false)
		when :wait_readable
			self.io_wait(fiber, io, IO::READABLE)
		when :wait_writable
			self.io_wait(fiber, io, IO::WRITABLE)
		else
			break unless result
			
			buffer.copy(result, offset)
			
			offset += result.bytesize
			length -= result.bytesize
		end
	end
	
	return offset
end

#io_wait(fiber, io, events) ⇒ Object



117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
# File 'lib/io/event/selector/select.rb', line 117

def io_wait(fiber, io, events)
	remove_readable = remove_writable = false
	
	if (events & IO::READABLE) > 0 or (events & IO::PRIORITY) > 0
		@readable[io] = fiber
		remove_readable = true
	end
	
	if (events & IO::WRITABLE) > 0
		@writable[io] = fiber
		remove_writable = true
	end
	
	@loop.transfer
ensure
	@readable.delete(io) if remove_readable
	@writable.delete(io) if remove_writable
end

#io_write(fiber, io, buffer, length) ⇒ Object



162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
# File 'lib/io/event/selector/select.rb', line 162

def io_write(fiber, io, buffer, length)
	offset = 0
	
	while length > 0
		# From offset until the end:
		chunk = buffer.to_str(offset, length)
		case result = io.write_nonblock(chunk, exception: false)
		when :wait_readable
			self.io_wait(fiber, io, IO::READABLE)
		when :wait_writable
			self.io_wait(fiber, io, IO::WRITABLE)
		else
			offset += result
			length -= result
		end
	end
	
	return offset
end

#process_wait(fiber, pid, flags) ⇒ Object



183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
# File 'lib/io/event/selector/select.rb', line 183

def process_wait(fiber, pid, flags)
	r, w = IO.pipe
	
	thread = Thread.new do
		Process::Status.wait(pid, flags)
	ensure
		w.close
	end
	
	self.io_wait(fiber, r, IO::READABLE)
	
	return thread.value
ensure
	r.close
	w.close
	thread&.kill
end

#push(fiber) ⇒ Object

Append the given fiber into the ready list.



99
100
101
# File 'lib/io/event/selector/select.rb', line 99

def push(fiber)
	@ready.push(fiber)
end

#raise(fiber, *arguments) ⇒ Object

Transfer to the given fiber and raise an exception. Put the current fiber into the ready list.



104
105
106
107
108
109
110
111
# File 'lib/io/event/selector/select.rb', line 104

def raise(fiber, *arguments)
	optional = Optional.new(Fiber.current)
	@ready.push(optional)
	
	fiber.raise(*arguments)
ensure
	optional.nullify
end

#ready?Boolean

Returns:

  • (Boolean)


113
114
115
# File 'lib/io/event/selector/select.rb', line 113

def ready?
	!@ready.empty?
end

#resume(fiber, *arguments) ⇒ Object

Transfer from the current fiber to the specified fiber. Put the current fiber into the ready list.



79
80
81
82
83
84
85
86
# File 'lib/io/event/selector/select.rb', line 79

def resume(fiber, *arguments)
	optional = Optional.new(Fiber.current)
	@ready.push(optional)
	
	fiber.transfer(*arguments)
ensure
	optional.nullify
end

#select(duration = nil) ⇒ Object



214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
# File 'lib/io/event/selector/select.rb', line 214

def select(duration = nil)
	if pop_ready
		# If we have popped items from the ready list, they may influence the duration calculation, so we don't delay the event loop:
		duration = 0
	end
	
	@blocked = true
	duration = 0 unless @ready.empty?
	readable, writable, _ = ::IO.select(@readable.keys, @writable.keys, nil, duration)
	@blocked = false
	
	ready = Hash.new(0)
	
	readable&.each do |io|
		fiber = @readable.delete(io)
		ready[fiber] |= IO::READABLE
	end
	
	writable&.each do |io|
		fiber = @writable.delete(io)
		ready[fiber] |= IO::WRITABLE
	end
	
	ready.each do |fiber, events|
		fiber.transfer(events) if fiber.alive?
	end
	
	return ready.size
end

#transferObject

Transfer from the current fiber to the event loop.



74
75
76
# File 'lib/io/event/selector/select.rb', line 74

def transfer
	@loop.transfer
end

#wakeupObject

If the event loop is currently blocked,



41
42
43
44
45
46
47
48
49
# File 'lib/io/event/selector/select.rb', line 41

def wakeup
	if @blocked
		@interrupt.signal
		
		return true
	end
	
	return false
end

#yieldObject

Yield from the current fiber back to the event loop. Put the current fiber into the ready list.



89
90
91
92
93
94
95
96
# File 'lib/io/event/selector/select.rb', line 89

def yield
	optional = Optional.new(Fiber.current)
	@ready.push(optional)
	
	@loop.transfer
ensure
	optional.nullify
end