Class: IO::Event::Selector::Select
- Inherits:
-
Object
- Object
- IO::Event::Selector::Select
show all
- Defined in:
- lib/io/event/selector/select.rb
Defined Under Namespace
Classes: Optional, Waiter
Constant Summary
collapse
- EAGAIN =
-Errno::EAGAIN::Errno
- EWOULDBLOCK =
-Errno::EWOULDBLOCK::Errno
Instance Attribute Summary collapse
Instance Method Summary
collapse
-
#again?(errno) ⇒ Boolean
-
#blocking(&block) ⇒ Object
-
#close ⇒ Object
-
#initialize(loop) ⇒ Select
constructor
A new instance of Select.
-
#io_read(fiber, _io, buffer, length, offset = 0) ⇒ Object
-
#io_select(readable, writable, priority, timeout) ⇒ Object
-
#io_wait(fiber, io, events) ⇒ Object
-
#io_write(fiber, _io, buffer, length, offset = 0) ⇒ Object
-
#process_wait(fiber, pid, flags) ⇒ Object
-
#push(fiber) ⇒ Object
Append the given fiber into the ready list.
-
#raise(fiber, *arguments) ⇒ Object
Transfer to the given fiber and raise an exception.
-
#ready? ⇒ Boolean
-
#resume(fiber, *arguments) ⇒ Object
Transfer from the current fiber to the specified fiber.
-
#select(duration = nil) ⇒ Object
-
#transfer ⇒ Object
Transfer from the current fiber to the event loop.
-
#wakeup ⇒ Object
If the event loop is currently sleeping, wake it up.
-
#yield ⇒ Object
Yield from the current fiber back to the event loop.
Constructor Details
#initialize(loop) ⇒ Select
Returns a new instance of Select.
12
13
14
15
16
17
18
19
20
21
|
# File 'lib/io/event/selector/select.rb', line 12
def initialize(loop)
@loop = loop
@waiting = Hash.new.compare_by_identity
@blocked = false
@ready = Queue.new
@interrupt = Interrupt.attach(self)
end
|
Instance Attribute Details
#loop ⇒ Object
Returns the value of attribute loop.
23
24
25
|
# File 'lib/io/event/selector/select.rb', line 23
def loop
@loop
end
|
Instance Method Details
#again?(errno) ⇒ Boolean
147
148
149
|
# File 'lib/io/event/selector/select.rb', line 147
def again?(errno)
errno == EAGAIN or errno == EWOULDBLOCK
end
|
#blocking(&block) ⇒ Object
273
274
275
276
|
# File 'lib/io/event/selector/select.rb', line 273
def blocking(&block)
fiber = Fiber.new(blocking: true, &block)
return fiber.resume(fiber)
end
|
#close ⇒ Object
36
37
38
39
40
41
|
# File 'lib/io/event/selector/select.rb', line 36
def close
@interrupt.close
@loop = nil
@waiting = nil
end
|
#io_read(fiber, _io, buffer, length, offset = 0) ⇒ Object
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
|
# File 'lib/io/event/selector/select.rb', line 151
def io_read(fiber, io, buffer, length, offset = 0)
total = 0
Selector.nonblock(io) do
while true
maximum_size = buffer.size - offset
result = Fiber.blocking{buffer.read(io, maximum_size, offset)}
if again?(result)
if length > 0
self.io_wait(fiber, io, IO::READABLE)
else
return result
end
elsif result < 0
return result
else
total += result
offset += result
break if total >= length
end
end
end
return total
end
|
#io_select(readable, writable, priority, timeout) ⇒ Object
137
138
139
140
141
|
# File 'lib/io/event/selector/select.rb', line 137
def io_select(readable, writable, priority, timeout)
Thread.new do
IO.select(readable, writable, priority, timeout)
end.value
end
|
#io_wait(fiber, io, events) ⇒ Object
129
130
131
132
133
134
135
|
# File 'lib/io/event/selector/select.rb', line 129
def io_wait(fiber, io, events)
waiter = @waiting[io] = Waiter.new(fiber, events, @waiting[io])
@loop.transfer
ensure
waiter&.invalidate
end
|
#io_write(fiber, _io, buffer, length, offset = 0) ⇒ Object
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
|
# File 'lib/io/event/selector/select.rb', line 178
def io_write(fiber, io, buffer, length, offset = 0)
total = 0
Selector.nonblock(io) do
while true
maximum_size = buffer.size - offset
result = Fiber.blocking{buffer.write(io, maximum_size, offset)}
if again?(result)
if length > 0
self.io_wait(fiber, io, IO::READABLE)
else
return result
end
elsif result < 0
return result
else
total += result
offset += result
break if total >= length
end
end
end
return total
end
|
#process_wait(fiber, pid, flags) ⇒ Object
279
280
281
282
283
|
# File 'lib/io/event/selector/select.rb', line 279
def process_wait(fiber, pid, flags)
Thread.new do
Process::Status.wait(pid, flags)
end.value
end
|
#push(fiber) ⇒ Object
Append the given fiber into the ready list.
83
84
85
|
# File 'lib/io/event/selector/select.rb', line 83
def push(fiber)
@ready.push(fiber)
end
|
#raise(fiber, *arguments) ⇒ Object
Transfer to the given fiber and raise an exception. Put the current fiber into the ready list.
88
89
90
91
92
93
94
95
|
# File 'lib/io/event/selector/select.rb', line 88
def raise(fiber, *arguments)
optional = Optional.new(Fiber.current)
@ready.push(optional)
fiber.raise(*arguments)
ensure
optional.nullify
end
|
#ready? ⇒ Boolean
97
98
99
|
# File 'lib/io/event/selector/select.rb', line 97
def ready?
!@ready.empty?
end
|
#resume(fiber, *arguments) ⇒ Object
Transfer from the current fiber to the specified fiber. Put the current fiber into the ready list.
63
64
65
66
67
68
69
70
|
# File 'lib/io/event/selector/select.rb', line 63
def resume(fiber, *arguments)
optional = Optional.new(Fiber.current)
@ready.push(optional)
fiber.transfer(*arguments)
ensure
optional.nullify
end
|
#select(duration = nil) ⇒ Object
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
|
# File 'lib/io/event/selector/select.rb', line 298
def select(duration = nil)
if pop_ready
duration = 0
end
readable = Array.new
writable = Array.new
priority = Array.new
@waiting.each do |io, waiter|
waiter.each do |fiber, events|
if (events & IO::READABLE) > 0
readable << io
end
if (events & IO::WRITABLE) > 0
writable << io
end
if (events & IO::PRIORITY) > 0
priority << io
end
end
end
@blocked = true
duration = 0 unless @ready.empty?
readable, writable, priority = ::IO.select(readable, writable, priority, duration)
@blocked = false
ready = Hash.new(0)
readable&.each do |io|
ready[io] |= IO::READABLE
end
writable&.each do |io|
ready[io] |= IO::WRITABLE
end
priority&.each do |io|
ready[io] |= IO::PRIORITY
end
ready.each do |io, events|
@waiting.delete(io).transfer(events)
end
return ready.size
end
|
#transfer ⇒ Object
Transfer from the current fiber to the event loop.
58
59
60
|
# File 'lib/io/event/selector/select.rb', line 58
def transfer
@loop.transfer
end
|
#wakeup ⇒ Object
If the event loop is currently sleeping, wake it up.
26
27
28
29
30
31
32
33
34
|
# File 'lib/io/event/selector/select.rb', line 26
def wakeup
if @blocked
@interrupt.signal
return true
end
return false
end
|
#yield ⇒ Object
Yield from the current fiber back to the event loop. Put the current fiber into the ready list.
73
74
75
76
77
78
79
80
|
# File 'lib/io/event/selector/select.rb', line 73
def yield
optional = Optional.new(Fiber.current)
@ready.push(optional)
@loop.transfer
ensure
optional.nullify
end
|