Class: Evt::Bundled
- Inherits:
-
Object
- Object
- Evt::Bundled
- Defined in:
- lib/evt/backends/bundled.rb,
ext/evt/evt.c
Defined Under Namespace
Classes: Payload
Constant Summary collapse
- MAXIMUM_TIMEOUT =
5000
- COLLECT_COUNTER_MAX =
16384
Instance Attribute Summary collapse
-
#readable ⇒ Object
readonly
Returns the value of attribute readable.
-
#waiting ⇒ Object
readonly
Returns the value of attribute waiting.
-
#writable ⇒ Object
readonly
Returns the value of attribute writable.
Class Method Summary collapse
Instance Method Summary collapse
-
#block(blocker, timeout = nil) ⇒ Object
Block the calling fiber.
-
#close ⇒ Object
Invoked when the thread exits.
-
#collect(force = false) ⇒ Object
Collect closed streams in readables and writables.
- #current_time ⇒ Object
- #epoll_deregister ⇒ Object
- #epoll_init_selector ⇒ Object
- #epoll_register ⇒ Object
- #epoll_wait ⇒ Object
-
#fiber(&block) ⇒ Object
Intercept the creation of a non-blocking fiber.
-
#initialize ⇒ Bundled
constructor
A new instance of Bundled.
-
#io_wait(io, events, duration) ⇒ Object
Wait for the given file descriptor to match the specified events within the specified timeout.
-
#kernel_sleep(duration = nil) ⇒ Object
Sleep the current task for the specified duration, or forever if not specified.
- #kqueue_init_selector ⇒ Object
- #kqueue_register ⇒ Object
- #kqueue_wait ⇒ Object
- #next_timeout ⇒ Object
- #run ⇒ Object
- #select_wait ⇒ Object
-
#unblock(blocker, fiber) ⇒ Object
Unblock the specified fiber.
- #uring_init_selector ⇒ Object
- #uring_io_read ⇒ Object
- #uring_io_write ⇒ Object
- #uring_register ⇒ Object
- #uring_wait ⇒ Object
Constructor Details
#initialize ⇒ Bundled
Returns a new instance of Bundled.
7 8 9 10 11 12 13 14 15 16 17 18 19 |
# File 'lib/evt/backends/bundled.rb', line 7 def initialize @readable = {} @writable = {} @waiting = {} @iovs = {} @lock = Mutex.new @blocking = 0 @ready = [] @collect_counter = 0 init_selector end |
Instance Attribute Details
#readable ⇒ Object (readonly)
Returns the value of attribute readable.
21 22 23 |
# File 'lib/evt/backends/bundled.rb', line 21 def readable @readable end |
#waiting ⇒ Object (readonly)
Returns the value of attribute waiting.
23 24 25 |
# File 'lib/evt/backends/bundled.rb', line 23 def waiting @waiting end |
#writable ⇒ Object (readonly)
Returns the value of attribute writable.
22 23 24 |
# File 'lib/evt/backends/bundled.rb', line 22 def writable @writable end |
Class Method Details
.epoll_backend ⇒ Object
.kqueue_backend ⇒ Object
.select_backend ⇒ Object
.uring_backend ⇒ Object
Instance Method Details
#block(blocker, timeout = nil) ⇒ Object
Block the calling fiber.
120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 |
# File 'lib/evt/backends/bundled.rb', line 120 def block(blocker, timeout = nil) if timeout @waiting[Fiber.current] = current_time + timeout begin Fiber.yield ensure @waiting.delete(Fiber.current) end else @blocking += 1 begin Fiber.yield ensure @blocking -= 1 end end end |
#close ⇒ Object
Invoked when the thread exits.
149 150 151 |
# File 'lib/evt/backends/bundled.rb', line 149 def close self.run end |
#collect(force = false) ⇒ Object
Collect closed streams in readables and writables
154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 |
# File 'lib/evt/backends/bundled.rb', line 154 def collect(force=false) if @collect_counter < COLLECT_COUNTER_MAX and !force @collect_counter += 1 return end @collect_counter = 0 @readable.keys.each do |io| @readable.delete(io) if io.closed? end @writable.keys.each do |io| @writable.delete(io) if io.closed? end @iovs.keys.each do |io| @iovs.delete(io) if io.closed? end end |
#current_time ⇒ Object
88 89 90 |
# File 'lib/evt/backends/bundled.rb', line 88 def current_time Process.clock_gettime(Process::CLOCK_MONOTONIC) end |
#epoll_deregister ⇒ Object
#epoll_init_selector ⇒ Object
#epoll_register ⇒ Object
#epoll_wait ⇒ Object
#fiber(&block) ⇒ Object
Intercept the creation of a non-blocking fiber.
177 178 179 180 181 |
# File 'lib/evt/backends/bundled.rb', line 177 def fiber(&block) fiber = Fiber.new(blocking: false, &block) fiber.resume fiber end |
#io_wait(io, events, duration) ⇒ Object
Wait for the given file descriptor to match the specified events within the specified timeout.
98 99 100 101 102 103 104 105 106 |
# File 'lib/evt/backends/bundled.rb', line 98 def io_wait(io, events, duration) # TODO: IO::PRIORITY @readable[io] = Fiber.current unless (events & IO::READABLE).zero? @writable[io] = Fiber.current unless (events & IO::WRITABLE).zero? self.register(io, events) Fiber.yield self.deregister(io) true end |
#kernel_sleep(duration = nil) ⇒ Object
Sleep the current task for the specified duration, or forever if not specified.
111 112 113 114 |
# File 'lib/evt/backends/bundled.rb', line 111 def kernel_sleep(duration = nil) self.block(:sleep, duration) true end |
#kqueue_init_selector ⇒ Object
#kqueue_register ⇒ Object
#kqueue_wait ⇒ Object
#next_timeout ⇒ Object
25 26 27 28 29 30 31 32 33 34 35 |
# File 'lib/evt/backends/bundled.rb', line 25 def next_timeout _fiber, timeout = @waiting.min_by{ |key, value| value } if timeout offset = (timeout - current_time) * 1000 # Use mililisecond return 0 if offset < 0 return offset if offset < MAXIMUM_TIMEOUT end MAXIMUM_TIMEOUT end |
#run ⇒ Object
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 |
# File 'lib/evt/backends/bundled.rb', line 37 def run while @readable.any? or @writable.any? or @waiting.any? or @iovs.any? or @blocking.positive? readable, writable, iovs = self.wait readable&.each do |io| fiber = @readable.delete(io) fiber&.resume end writable&.each do |io| fiber = @writable.delete(io) fiber&.resume end unless iovs.nil? iovs&.each do |v| io, ret = v fiber = @iovs.delete(io) fiber&.resume(ret) end end collect if @waiting.any? time = current_time waiting, @waiting = @waiting, {} waiting.each do |fiber, timeout| if timeout <= time fiber.resume if fiber.is_a? Fiber and fiber.alive? else @waiting[fiber] = timeout end end end if @ready.any? ready = nil @lock.synchronize do ready, @ready = @ready, [] end ready.each do |fiber| fiber.resume if fiber.is_a? Fiber and fiber.alive? end end end end |
#select_wait ⇒ Object
#unblock(blocker, fiber) ⇒ Object
Unblock the specified fiber.
142 143 144 145 146 |
# File 'lib/evt/backends/bundled.rb', line 142 def unblock(blocker, fiber) @lock.synchronize do @ready << fiber end end |