Class: IO::Event::Selector::Select
- Inherits:
-
Object
- Object
- IO::Event::Selector::Select
- Defined in:
- lib/io/event/selector/select.rb
Defined Under Namespace
Classes: Optional
Constant Summary collapse
- EAGAIN =
Errno::EAGAIN::Errno
Instance Attribute Summary collapse
-
#loop ⇒ Object
readonly
Returns the value of attribute loop.
Instance Method Summary collapse
- #close ⇒ Object
-
#initialize(loop) ⇒ Select
constructor
A new instance of Select.
- #io_read(fiber, io, buffer, length) ⇒ Object
- #io_wait(fiber, io, events) ⇒ Object
- #io_write(fiber, io, buffer, length) ⇒ Object
- #process_wait(fiber, pid, flags) ⇒ Object
-
#push(fiber) ⇒ Object
Append the given fiber into the ready list.
-
#raise(fiber, *arguments) ⇒ Object
Transfer to the given fiber and raise an exception.
- #ready? ⇒ Boolean
-
#resume(fiber, *arguments) ⇒ Object
Transfer from the current fiber to the specified fiber.
- #select(duration = nil) ⇒ Object
-
#transfer ⇒ Object
Transfer from the current fiber to the event loop.
-
#wakeup ⇒ Object
If the event loop is currently blocked,.
-
#yield ⇒ Object
Yield from the current fiber back to the event loop.
Constructor Details
#initialize(loop) ⇒ Select
Returns a new instance of Select.
26 27 28 29 30 31 32 33 34 35 36 |
# File 'lib/io/event/selector/select.rb', line 26 def initialize(loop) @loop = loop @readable = Hash.new.compare_by_identity @writable = Hash.new.compare_by_identity @blocked = false @ready = Queue.new @interrupt = Interrupt.attach(self) end |
Instance Attribute Details
#loop ⇒ Object (readonly)
Returns the value of attribute loop.
38 39 40 |
# File 'lib/io/event/selector/select.rb', line 38 def loop @loop end |
Instance Method Details
#close ⇒ Object
51 52 53 54 55 56 57 |
# File 'lib/io/event/selector/select.rb', line 51 def close @interrupt.close @loop = nil @readable = nil @writable = nil end |
#io_read(fiber, io, buffer, length) ⇒ Object
139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 |
# File 'lib/io/event/selector/select.rb', line 139 def io_read(fiber, io, buffer, length) offset = 0 while true maximum_size = buffer.size - offset case result = blocking{io.read_nonblock(maximum_size, exception: false)} when :wait_readable if length > 0 self.io_wait(fiber, io, IO::READABLE) else return -EAGAIN end when :wait_writable if length > 0 self.io_wait(fiber, io, IO::WRITABLE) else return -EAGAIN end else break unless result buffer.copy(result, offset) size = result.bytesize offset += size break if size >= length length -= size end end return offset end |
#io_wait(fiber, io, events) ⇒ Object
117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 |
# File 'lib/io/event/selector/select.rb', line 117 def io_wait(fiber, io, events) remove_readable = remove_writable = false if (events & IO::READABLE) > 0 or (events & IO::PRIORITY) > 0 @readable[io] = fiber remove_readable = true end if (events & IO::WRITABLE) > 0 @writable[io] = fiber remove_writable = true end @loop.transfer ensure @readable.delete(io) if remove_readable @writable.delete(io) if remove_writable end |
#io_write(fiber, io, buffer, length) ⇒ Object
173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 |
# File 'lib/io/event/selector/select.rb', line 173 def io_write(fiber, io, buffer, length) offset = 0 while true maximum_size = buffer.size - offset chunk = buffer.to_str(offset, maximum_size) case result = blocking{io.write_nonblock(chunk, exception: false)} when :wait_readable if length > 0 self.io_wait(fiber, io, IO::READABLE) else return -EAGAIN end when :wait_writable if length > 0 self.io_wait(fiber, io, IO::WRITABLE) else return -EAGAIN end else offset += result break if result >= length length -= result end end return offset end |
#process_wait(fiber, pid, flags) ⇒ Object
204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 |
# File 'lib/io/event/selector/select.rb', line 204 def process_wait(fiber, pid, flags) r, w = IO.pipe thread = Thread.new do Process::Status.wait(pid, flags) ensure w.close end self.io_wait(fiber, r, IO::READABLE) return thread.value ensure r.close w.close thread&.kill end |
#push(fiber) ⇒ Object
Append the given fiber into the ready list.
99 100 101 |
# File 'lib/io/event/selector/select.rb', line 99 def push(fiber) @ready.push(fiber) end |
#raise(fiber, *arguments) ⇒ Object
Transfer to the given fiber and raise an exception. Put the current fiber into the ready list.
104 105 106 107 108 109 110 111 |
# File 'lib/io/event/selector/select.rb', line 104 def raise(fiber, *arguments) optional = Optional.new(Fiber.current) @ready.push(optional) fiber.raise(*arguments) ensure optional.nullify end |
#ready? ⇒ Boolean
113 114 115 |
# File 'lib/io/event/selector/select.rb', line 113 def ready? !@ready.empty? end |
#resume(fiber, *arguments) ⇒ Object
Transfer from the current fiber to the specified fiber. Put the current fiber into the ready list.
79 80 81 82 83 84 85 86 |
# File 'lib/io/event/selector/select.rb', line 79 def resume(fiber, *arguments) optional = Optional.new(Fiber.current) @ready.push(optional) fiber.transfer(*arguments) ensure optional.nullify end |
#select(duration = nil) ⇒ Object
235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 |
# File 'lib/io/event/selector/select.rb', line 235 def select(duration = nil) if pop_ready # If we have popped items from the ready list, they may influence the duration calculation, so we don't delay the event loop: duration = 0 end @blocked = true duration = 0 unless @ready.empty? readable, writable, _ = ::IO.select(@readable.keys, @writable.keys, nil, duration) @blocked = false ready = Hash.new(0) readable&.each do |io| fiber = @readable.delete(io) ready[fiber] |= IO::READABLE end writable&.each do |io| fiber = @writable.delete(io) ready[fiber] |= IO::WRITABLE end ready.each do |fiber, events| fiber.transfer(events) if fiber.alive? end return ready.size end |
#transfer ⇒ Object
Transfer from the current fiber to the event loop.
74 75 76 |
# File 'lib/io/event/selector/select.rb', line 74 def transfer @loop.transfer end |
#wakeup ⇒ Object
If the event loop is currently blocked,
41 42 43 44 45 46 47 48 49 |
# File 'lib/io/event/selector/select.rb', line 41 def wakeup if @blocked @interrupt.signal return true end return false end |