Class: IO::Event::Selector::Select
- Inherits:
-
Object
- Object
- IO::Event::Selector::Select
- Defined in:
- lib/io/event/selector/select.rb
Defined Under Namespace
Constant Summary collapse
- EAGAIN =
Errno::EAGAIN::Errno
Instance Attribute Summary collapse
-
#loop ⇒ Object
readonly
Returns the value of attribute loop.
Instance Method Summary collapse
- #close ⇒ Object
-
#initialize(loop) ⇒ Select
constructor
A new instance of Select.
- #io_read(fiber, io, buffer, length) ⇒ Object
- #io_wait(fiber, io, events) ⇒ Object
- #io_write(fiber, io, buffer, length) ⇒ Object
- #process_wait(fiber, pid, flags) ⇒ Object
-
#push(fiber) ⇒ Object
Append the given fiber into the ready list.
-
#raise(fiber, *arguments) ⇒ Object
Transfer to the given fiber and raise an exception.
- #ready? ⇒ Boolean
-
#resume(fiber, *arguments) ⇒ Object
Transfer from the current fiber to the specified fiber.
- #select(duration = nil) ⇒ Object
-
#transfer ⇒ Object
Transfer from the current fiber to the event loop.
-
#wakeup ⇒ Object
If the event loop is currently blocked,.
-
#yield ⇒ Object
Yield from the current fiber back to the event loop.
Constructor Details
#initialize(loop) ⇒ Select
Returns a new instance of Select.
26 27 28 29 30 31 32 33 34 35 |
# File 'lib/io/event/selector/select.rb', line 26 def initialize(loop) @loop = loop @waiting = Hash.new.compare_by_identity @blocked = false @ready = Queue.new @interrupt = Interrupt.attach(self) end |
Instance Attribute Details
#loop ⇒ Object (readonly)
Returns the value of attribute loop.
37 38 39 |
# File 'lib/io/event/selector/select.rb', line 37 def loop @loop end |
Instance Method Details
#close ⇒ Object
50 51 52 53 54 55 |
# File 'lib/io/event/selector/select.rb', line 50 def close @interrupt.close @loop = nil @waiting = nil end |
#io_read(fiber, io, buffer, length) ⇒ Object
154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 |
# File 'lib/io/event/selector/select.rb', line 154 def io_read(fiber, io, buffer, length) offset = 0 while true maximum_size = buffer.size - offset case result = blocking{io.read_nonblock(maximum_size, exception: false)} when :wait_readable if length > 0 self.io_wait(fiber, io, IO::READABLE) else return -EAGAIN end when :wait_writable if length > 0 self.io_wait(fiber, io, IO::WRITABLE) else return -EAGAIN end when nil break else buffer.set_string(result, offset) size = result.bytesize offset += size break if size >= length length -= size end end return offset end |
#io_wait(fiber, io, events) ⇒ Object
143 144 145 146 147 148 149 |
# File 'lib/io/event/selector/select.rb', line 143 def io_wait(fiber, io, events) waiter = @waiting[io] = Waiter.new(fiber, events, @waiting[io]) @loop.transfer ensure waiter&.invalidate end |
#io_write(fiber, io, buffer, length) ⇒ Object
188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 |
# File 'lib/io/event/selector/select.rb', line 188 def io_write(fiber, io, buffer, length) offset = 0 while true maximum_size = buffer.size - offset chunk = buffer.get_string(offset, maximum_size) case result = blocking{io.write_nonblock(chunk, exception: false)} when :wait_readable if length > 0 self.io_wait(fiber, io, IO::READABLE) else return -EAGAIN end when :wait_writable if length > 0 self.io_wait(fiber, io, IO::WRITABLE) else return -EAGAIN end else offset += result break if result >= length length -= result end end return offset end |
#process_wait(fiber, pid, flags) ⇒ Object
219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 |
# File 'lib/io/event/selector/select.rb', line 219 def process_wait(fiber, pid, flags) r, w = IO.pipe thread = Thread.new do Process::Status.wait(pid, flags) ensure w.close end self.io_wait(fiber, r, IO::READABLE) return thread.value ensure r.close w.close thread&.kill end |
#push(fiber) ⇒ Object
Append the given fiber into the ready list.
97 98 99 |
# File 'lib/io/event/selector/select.rb', line 97 def push(fiber) @ready.push(fiber) end |
#raise(fiber, *arguments) ⇒ Object
Transfer to the given fiber and raise an exception. Put the current fiber into the ready list.
102 103 104 105 106 107 108 109 |
# File 'lib/io/event/selector/select.rb', line 102 def raise(fiber, *arguments) optional = Optional.new(Fiber.current) @ready.push(optional) fiber.raise(*arguments) ensure optional.nullify end |
#ready? ⇒ Boolean
111 112 113 |
# File 'lib/io/event/selector/select.rb', line 111 def ready? !@ready.empty? end |
#resume(fiber, *arguments) ⇒ Object
Transfer from the current fiber to the specified fiber. Put the current fiber into the ready list.
77 78 79 80 81 82 83 84 |
# File 'lib/io/event/selector/select.rb', line 77 def resume(fiber, *arguments) optional = Optional.new(Fiber.current) @ready.push(optional) fiber.transfer(*arguments) ensure optional.nullify end |
#select(duration = nil) ⇒ Object
250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 |
# File 'lib/io/event/selector/select.rb', line 250 def select(duration = nil) if pop_ready # If we have popped items from the ready list, they may influence the duration calculation, so we don't delay the event loop: duration = 0 end readable = Array.new writable = Array.new @waiting.each do |io, waiter| waiter.each do |fiber, events| if (events & IO::READABLE) > 0 readable << io end if (events & IO::WRITABLE) > 0 writable << io end end end @blocked = true duration = 0 unless @ready.empty? readable, writable, _ = ::IO.select(readable, writable, nil, duration) @blocked = false ready = Hash.new(0) readable&.each do |io| ready[io] |= IO::READABLE end writable&.each do |io| ready[io] |= IO::WRITABLE end ready.each do |io, events| @waiting.delete(io).transfer(events) end return ready.size end |
#transfer ⇒ Object
Transfer from the current fiber to the event loop.
72 73 74 |
# File 'lib/io/event/selector/select.rb', line 72 def transfer @loop.transfer end |
#wakeup ⇒ Object
If the event loop is currently blocked,
40 41 42 43 44 45 46 47 48 |
# File 'lib/io/event/selector/select.rb', line 40 def wakeup if @blocked @interrupt.signal return true end return false end |