Class: Yahns::Queue
- Inherits:
-
SleepyPenguin::Kqueue::IO
- Object
- SleepyPenguin::Kqueue::IO
- Yahns::Queue
- Includes:
- SleepyPenguin
- Defined in:
- lib/yahns/queue_epoll.rb,
lib/yahns/queue_kqueue.rb
Overview
Copyright © 2013-2016 all contributors <[email protected]> License: GPL-3.0+ (www.gnu.org/licenses/gpl-3.0.txt) frozen_string_literal: true
This is the dangerous, low-level kqueue interface for sleepy_penguin It is safe as long as you’re aware of all potential concurrency issues given multithreading, GC, and kqueue itself.
Constant Summary collapse
- QEV_QUIT =
public
nil
- QEV_RD =
Level Trigger for QueueQuitter
EvFilt::READ
- QEV_WR =
EvFilt::WRITE
- ADD_ONESHOT =
private
Ev::ADD | Ev::ONESHOT
Instance Attribute Summary collapse
-
#fdmap ⇒ Object
Yahns::Fdmap.
Class Method Summary collapse
Instance Method Summary collapse
-
#queue_add(io, flags) ⇒ Object
for HTTP and HTTPS servers, we rely on the io writing to us, first flags: QEV_RD/QEV_WR (usually QEV_RD).
- #queue_mod(io, flags) ⇒ Object
- #thr_init ⇒ Object
-
#worker_thread(logger, max_events) ⇒ Object
returns an array of infinitely running threads.
Instance Attribute Details
#fdmap ⇒ Object
Yahns::Fdmap
11 12 13 |
# File 'lib/yahns/queue_epoll.rb', line 11 def fdmap @fdmap end |
Class Method Details
.new ⇒ Object
18 19 20 |
# File 'lib/yahns/queue_epoll.rb', line 18 def self.new super(Epoll::CLOEXEC) end |
Instance Method Details
#queue_add(io, flags) ⇒ Object
for HTTP and HTTPS servers, we rely on the io writing to us, first flags: QEV_RD/QEV_WR (usually QEV_RD)
24 25 26 27 28 29 |
# File 'lib/yahns/queue_epoll.rb', line 24 def queue_add(io, flags) # order is very important here, this thread cannot do anything with # io once we've issued epoll_ctl() because another thread may use it @fdmap.add(io) epoll_ctl(Epoll::CTL_ADD, io, flags) end |
#queue_mod(io, flags) ⇒ Object
31 32 33 |
# File 'lib/yahns/queue_epoll.rb', line 31 def queue_mod(io, flags) epoll_ctl(Epoll::CTL_MOD, io, flags) end |
#thr_init ⇒ Object
35 36 37 38 39 |
# File 'lib/yahns/queue_epoll.rb', line 35 def thr_init Thread.current[:yahns_rbuf] = ''.dup Thread.current[:yahns_fdmap] = @fdmap Thread.current[:yahns_queue] = self end |
#worker_thread(logger, max_events) ⇒ Object
returns an array of infinitely running threads
42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 |
# File 'lib/yahns/queue_epoll.rb', line 42 def worker_thread(logger, max_events) Thread.new do thr_init begin epoll_wait(max_events) do |_, io| # don't care for flags for now next if io.closed? # Note: we absolutely must not do anything with io after # we've called epoll_ctl on it, io is exclusive to this # thread only until epoll_ctl is called on it. case rv = io.yahns_step when :wait_readable epoll_ctl(Epoll::CTL_MOD, io, QEV_RD) when :wait_writable epoll_ctl(Epoll::CTL_MOD, io, QEV_WR) when :ignore # only used by rack.hijack # we cannot call Epoll::CTL_DEL after hijacking, the hijacker # may have already closed it Likewise, io.fileno is not # expected to work, so we had to erase it from fdmap before hijack when nil, :close # this must be the ONLY place where we call IO#close on # things that got inside the queue AND fdmap @fdmap.sync_close(io) else raise "BUG: #{io.inspect}#yahns_step returned: #{rv.inspect}" end end rescue StandardError, LoadError, SyntaxError => e break if closed? # can still happen due to shutdown_timeout Yahns::Log.exception(logger, 'queue loop', e) end while true end end |