Class: Event::Selector::Select
- Inherits:
-
Object
- Object
- Event::Selector::Select
- Defined in:
- lib/event/selector/select.rb
Defined Under Namespace
Classes: Queue
Instance Method Summary collapse
- #close ⇒ Object
-
#initialize(loop) ⇒ Select
constructor
A new instance of Select.
- #io_read(fiber, io, buffer, length) ⇒ Object
- #io_wait(fiber, io, events) ⇒ Object
- #io_write(fiber, io, buffer, length) ⇒ Object
- #process_wait(fiber, pid, flags) ⇒ Object
-
#push(fiber) ⇒ Object
Append the given fiber into the ready list.
-
#raise(fiber, *arguments) ⇒ Object
Transfer to the given fiber and raise an exception.
- #ready? ⇒ Boolean
-
#resume(fiber, *arguments) ⇒ Object
Transfer from the current fiber to the specified fiber.
- #select(duration = nil) ⇒ Object
-
#transfer ⇒ Object
Transfer from the current fiber to the event loop.
-
#yield ⇒ Object
Yield from the current fiber back to the event loop.
Constructor Details
#initialize(loop) ⇒ Select
Returns a new instance of Select.
24 25 26 27 28 29 30 31 |
# File 'lib/event/selector/select.rb', line 24 def initialize(loop) @loop = loop @readable = {} @writable = {} @ready = [] end |
Instance Method Details
#close ⇒ Object
33 34 35 36 37 |
# File 'lib/event/selector/select.rb', line 33 def close @loop = nil @readable = nil @writable = nil end |
#io_read(fiber, io, buffer, length) ⇒ Object
117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 |
# File 'lib/event/selector/select.rb', line 117 def io_read(fiber, io, buffer, length) offset = 0 while length > 0 # The maximum size we can read: maximum_size = buffer.size - offset case result = io.read_nonblock(maximum_size, exception: false) when :wait_readable self.io_wait(fiber, io, READABLE) when :wait_writable self.io_wait(fiber, io, WRITABLE) else break if result.empty? buffer.copy(result, offset) offset += result.bytesize length -= result.bytesize end end return offset end |
#io_wait(fiber, io, events) ⇒ Object
97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 |
# File 'lib/event/selector/select.rb', line 97 def io_wait(fiber, io, events) remove_readable = remove_writable = false if (events & READABLE) > 0 or (events & PRIORITY) > 0 @readable[io] = fiber remove_readable = true end if (events & WRITABLE) > 0 @writable[io] = fiber remove_writable = true end @loop.transfer ensure @readable.delete(io) if remove_readable @writable.delete(io) if remove_writable end |
#io_write(fiber, io, buffer, length) ⇒ Object
142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 |
# File 'lib/event/selector/select.rb', line 142 def io_write(fiber, io, buffer, length) offset = 0 while length > 0 # From offset until the end: chunk = buffer.to_str(offset, length) case result = io.write_nonblock(chunk, exception: false) when :wait_readable self.io_wait(fiber, io, READABLE) when :wait_writable self.io_wait(fiber, io, WRITABLE) else offset += result length -= result end end return offset end |
#process_wait(fiber, pid, flags) ⇒ Object
163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 |
# File 'lib/event/selector/select.rb', line 163 def process_wait(fiber, pid, flags) r, w = IO.pipe thread = Thread.new do Process::Status.wait(pid, flags) ensure w.close end self.io_wait(fiber, r, READABLE) return thread.value ensure r.close w.close thread&.kill end |
#push(fiber) ⇒ Object
Append the given fiber into the ready list.
79 80 81 |
# File 'lib/event/selector/select.rb', line 79 def push(fiber) @ready.push(fiber) end |
#raise(fiber, *arguments) ⇒ Object
Transfer to the given fiber and raise an exception. Put the current fiber into the ready list.
84 85 86 87 88 89 90 91 |
# File 'lib/event/selector/select.rb', line 84 def raise(fiber, *arguments) queue = Queue.new(Fiber.current) @ready.push(queue) fiber.raise(*arguments) ensure queue.nullify end |
#ready? ⇒ Boolean
93 94 95 |
# File 'lib/event/selector/select.rb', line 93 def ready? @ready.any? end |
#resume(fiber, *arguments) ⇒ Object
Transfer from the current fiber to the specified fiber. Put the current fiber into the ready list.
59 60 61 62 63 64 65 66 |
# File 'lib/event/selector/select.rb', line 59 def resume(fiber, *arguments) queue = Queue.new(Fiber.current) @ready.push(queue) fiber.transfer(*arguments) ensure queue.nullify end |
#select(duration = nil) ⇒ Object
194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 |
# File 'lib/event/selector/select.rb', line 194 def select(duration = nil) if pop_ready duration = 0 end readable, writable, _ = ::IO.select(@readable.keys, @writable.keys, nil, duration) ready = Hash.new(0) readable&.each do |io| fiber = @readable.delete(io) ready[fiber] |= READABLE end writable&.each do |io| fiber = @writable.delete(io) ready[fiber] |= WRITABLE end ready.each do |fiber, events| fiber.transfer(events) if fiber.alive? end end |
#transfer ⇒ Object
Transfer from the current fiber to the event loop.
54 55 56 |
# File 'lib/event/selector/select.rb', line 54 def transfer @loop.transfer end |