Class: IO::Event::Selector::Select
- Inherits:
-
Object
- Object
- IO::Event::Selector::Select
- Defined in:
- lib/io/event/selector/select.rb
Defined Under Namespace
Classes: Optional
Instance Attribute Summary collapse
-
#loop ⇒ Object
readonly
Returns the value of attribute loop.
Instance Method Summary collapse
- #close ⇒ Object
-
#initialize(loop) ⇒ Select
constructor
A new instance of Select.
- #io_read(fiber, io, buffer, length) ⇒ Object
- #io_wait(fiber, io, events) ⇒ Object
- #io_write(fiber, io, buffer, length) ⇒ Object
- #process_wait(fiber, pid, flags) ⇒ Object
-
#push(fiber) ⇒ Object
Append the given fiber into the ready list.
-
#raise(fiber, *arguments) ⇒ Object
Transfer to the given fiber and raise an exception.
- #ready? ⇒ Boolean
-
#resume(fiber, *arguments) ⇒ Object
Transfer from the current fiber to the specified fiber.
- #select(duration = nil) ⇒ Object
-
#transfer ⇒ Object
Transfer from the current fiber to the event loop.
-
#wakeup ⇒ Object
If the event loop is currently blocked,.
-
#yield ⇒ Object
Yield from the current fiber back to the event loop.
Constructor Details
#initialize(loop) ⇒ Select
Returns a new instance of Select.
26 27 28 29 30 31 32 33 34 35 36 |
# File 'lib/io/event/selector/select.rb', line 26 def initialize(loop) @loop = loop @readable = Hash.new.compare_by_identity @writable = Hash.new.compare_by_identity @blocked = false @ready = Queue.new @interrupt = Interrupt.attach(self) end |
Instance Attribute Details
#loop ⇒ Object (readonly)
Returns the value of attribute loop.
38 39 40 |
# File 'lib/io/event/selector/select.rb', line 38 def loop @loop end |
Instance Method Details
#close ⇒ Object
51 52 53 54 55 56 57 |
# File 'lib/io/event/selector/select.rb', line 51 def close @interrupt.close @loop = nil @readable = nil @writable = nil end |
#io_read(fiber, io, buffer, length) ⇒ Object
137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 |
# File 'lib/io/event/selector/select.rb', line 137 def io_read(fiber, io, buffer, length) offset = 0 while length > 0 # The maximum size we can read: maximum_size = buffer.size - offset case result = io.read_nonblock(maximum_size, exception: false) when :wait_readable self.io_wait(fiber, io, IO::READABLE) when :wait_writable self.io_wait(fiber, io, IO::WRITABLE) else break unless result buffer.copy(result, offset) offset += result.bytesize length -= result.bytesize end end return offset end |
#io_wait(fiber, io, events) ⇒ Object
117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 |
# File 'lib/io/event/selector/select.rb', line 117 def io_wait(fiber, io, events) remove_readable = remove_writable = false if (events & IO::READABLE) > 0 or (events & IO::PRIORITY) > 0 @readable[io] = fiber remove_readable = true end if (events & IO::WRITABLE) > 0 @writable[io] = fiber remove_writable = true end @loop.transfer ensure @readable.delete(io) if remove_readable @writable.delete(io) if remove_writable end |
#io_write(fiber, io, buffer, length) ⇒ Object
162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 |
# File 'lib/io/event/selector/select.rb', line 162 def io_write(fiber, io, buffer, length) offset = 0 while length > 0 # From offset until the end: chunk = buffer.to_str(offset, length) case result = io.write_nonblock(chunk, exception: false) when :wait_readable self.io_wait(fiber, io, IO::READABLE) when :wait_writable self.io_wait(fiber, io, IO::WRITABLE) else offset += result length -= result end end return offset end |
#process_wait(fiber, pid, flags) ⇒ Object
183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 |
# File 'lib/io/event/selector/select.rb', line 183 def process_wait(fiber, pid, flags) r, w = IO.pipe thread = Thread.new do Process::Status.wait(pid, flags) ensure w.close end self.io_wait(fiber, r, IO::READABLE) return thread.value ensure r.close w.close thread&.kill end |
#push(fiber) ⇒ Object
Append the given fiber into the ready list.
99 100 101 |
# File 'lib/io/event/selector/select.rb', line 99 def push(fiber) @ready.push(fiber) end |
#raise(fiber, *arguments) ⇒ Object
Transfer to the given fiber and raise an exception. Put the current fiber into the ready list.
104 105 106 107 108 109 110 111 |
# File 'lib/io/event/selector/select.rb', line 104 def raise(fiber, *arguments) optional = Optional.new(Fiber.current) @ready.push(optional) fiber.raise(*arguments) ensure optional.nullify end |
#ready? ⇒ Boolean
113 114 115 |
# File 'lib/io/event/selector/select.rb', line 113 def ready? !@ready.empty? end |
#resume(fiber, *arguments) ⇒ Object
Transfer from the current fiber to the specified fiber. Put the current fiber into the ready list.
79 80 81 82 83 84 85 86 |
# File 'lib/io/event/selector/select.rb', line 79 def resume(fiber, *arguments) optional = Optional.new(Fiber.current) @ready.push(optional) fiber.transfer(*arguments) ensure optional.nullify end |
#select(duration = nil) ⇒ Object
214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 |
# File 'lib/io/event/selector/select.rb', line 214 def select(duration = nil) if pop_ready # If we have popped items from the ready list, they may influence the duration calculation, so we don't delay the event loop: duration = 0 end @blocked = true duration = 0 unless @ready.empty? readable, writable, _ = ::IO.select(@readable.keys, @writable.keys, nil, duration) @blocked = false ready = Hash.new(0) readable&.each do |io| fiber = @readable.delete(io) ready[fiber] |= IO::READABLE end writable&.each do |io| fiber = @writable.delete(io) ready[fiber] |= IO::WRITABLE end ready.each do |fiber, events| fiber.transfer(events) if fiber.alive? end return ready.size end |
#transfer ⇒ Object
Transfer from the current fiber to the event loop.
74 75 76 |
# File 'lib/io/event/selector/select.rb', line 74 def transfer @loop.transfer end |
#wakeup ⇒ Object
If the event loop is currently blocked,
41 42 43 44 45 46 47 48 49 |
# File 'lib/io/event/selector/select.rb', line 41 def wakeup if @blocked @interrupt.signal return true end return false end |