Class: Yahns::ReqRes
- Inherits:
-
Kgio::Socket
- Object
- Kgio::Socket
- Yahns::ReqRes
- Defined in:
- lib/yahns/req_res.rb
Overview
:nodoc:
Instance Attribute Summary collapse
-
#alive ⇒ Object
Returns the value of attribute alive.
-
#proxy_pass ⇒ Object
readonly
Returns the value of attribute proxy_pass.
-
#proxy_trailers ⇒ Object
Returns the value of attribute proxy_trailers.
-
#resbuf ⇒ Object
Returns the value of attribute resbuf.
Instance Method Summary collapse
- #prepare_wait_readable ⇒ Object
- #req_start(c, req, input, chunked, proxy_pass) ⇒ Object
-
#send_req_body(req) ⇒ Object
returns :wait_readable if complete, :wait_writable if not.
- #send_req_body_chunk(buf) ⇒ Object
-
#send_req_buf(buf) ⇒ Object
n.b.
-
#yahns_step ⇒ Object
yahns event loop entry point.
Instance Attribute Details
#alive ⇒ Object
Returns the value of attribute alive.
12 13 14 |
# File 'lib/yahns/req_res.rb', line 12 def alive @alive end |
#proxy_pass ⇒ Object (readonly)
Returns the value of attribute proxy_pass.
13 14 15 |
# File 'lib/yahns/req_res.rb', line 13 def proxy_pass @proxy_pass end |
#proxy_trailers ⇒ Object
Returns the value of attribute proxy_trailers.
11 12 13 |
# File 'lib/yahns/req_res.rb', line 11 def proxy_trailers @proxy_trailers end |
#resbuf ⇒ Object
Returns the value of attribute resbuf.
10 11 12 |
# File 'lib/yahns/req_res.rb', line 10 def resbuf @resbuf end |
Instance Method Details
#prepare_wait_readable ⇒ Object
146 147 148 149 |
# File 'lib/yahns/req_res.rb', line 146 def prepare_wait_readable @rrstate = Kcar::Parser.new :wait_readable # all done sending the request, wait for response end |
#req_start(c, req, input, chunked, proxy_pass) ⇒ Object
15 16 17 18 19 20 21 |
# File 'lib/yahns/req_res.rb', line 15 def req_start(c, req, input, chunked, proxy_pass) @hdr = @resbuf = nil @yahns_client = c @rrstate = input ? [ req, input, chunked ] : req @proxy_pass = proxy_pass Thread.current[:yahns_queue].queue_add(self, Yahns::Queue::QEV_WR) end |
#send_req_body(req) ⇒ Object
returns :wait_readable if complete, :wait_writable if not
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 |
# File 'lib/yahns/req_res.rb', line 95 def send_req_body(req) # @rrstate == [ (str|vec), rack.input, chunked? ] buf, input, chunked = req # send the first buffered chunk or vector rv = send_req_body_chunk(buf) and return rv # :wait_writable # yay, sent the first chunk, now read the body! rbuf = buf if chunked if String === buf # initial body req[0] = buf = [] else # try to reuse the biggest non-frozen buffer we just wrote; rbuf = buf.max_by(&:size) rbuf = ''.dup if rbuf.frozen? # unlikely... end end # Note: input (env['rack.input']) is fully-buffered by default so # we should not be waiting on a slow network resource when reading # input. However, some weird configs may disable this on LANs # and we may wait indefinitely on input.read here... while input.read(0x2000, rbuf) if chunked buf[0] = "#{rbuf.size.to_s(16)}\r\n".freeze buf[1] = rbuf buf[2] = "\r\n".freeze end rv = send_req_body_chunk(buf) and return rv # :wait_writable end rbuf.clear # all done, clear the big buffer # we cannot use respond_to?(:close) here since Rack::Lint::InputWrapper # tries to prevent that (and hijack means all Rack specs go out the door) case input when Yahns::TeeInput, IO input.close end # note: we do not send any trailer, they are folded into the header # because this relies on full request buffering # prepare_wait_readable is called by send_req_buf chunked ? send_req_buf("0\r\n\r\n".freeze) : prepare_wait_readable rescue Errno::EPIPE, Errno::ECONNRESET, Errno::ENOTCONN # no more reading off the client socket, just prepare to forward # the rejection response from the upstream (if any) @yahns_client.to_io.shutdown(Socket::SHUT_RD) prepare_wait_readable end |
#send_req_body_chunk(buf) ⇒ Object
82 83 84 85 86 87 88 89 90 91 92 |
# File 'lib/yahns/req_res.rb', line 82 def send_req_body_chunk(buf) case rv = String === buf ? kgio_trywrite(buf) : kgio_trywritev(buf) when String, Array buf.replace(rv) # retry loop on partial write when :wait_writable, nil # :wait_writable = upstream is reading slowly and making us wait return rv else abort "BUG: #{rv.inspect} from kgio_trywrite*" end while true end |
#send_req_buf(buf) ⇒ Object
n.b. buf must be a detached string not shared with Thread.current of any thread
153 154 155 156 157 158 159 160 161 162 163 |
# File 'lib/yahns/req_res.rb', line 153 def send_req_buf(buf) case rv = kgio_trywrite(buf) when String buf = rv # retry inner loop when :wait_writable @rrstate = buf return :wait_writable when nil return prepare_wait_readable end while true end |
#yahns_step ⇒ Object
yahns event loop entry point
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 |
# File 'lib/yahns/req_res.rb', line 23 def yahns_step # yahns event loop entry point c = @yahns_client case req = @rrstate when Kcar::Parser # reading response... buf = Thread.current[:yahns_rbuf] case resbuf = @resbuf # where are we at the response? when nil # common case, catch the response header in a single read case rv = kgio_tryread(0x2000, buf) when String if res = req.headers(@hdr = [], rv) return c.proxy_response_start(res, rv, req, self) else # ugh, big headers or tricked response # we must reinitialize the thread-local rbuf if it may # live beyond the current thread buf = Thread.current[:yahns_rbuf] = ''.dup @resbuf = rv end # continue looping in middle "case @resbuf" loop when :wait_readable return rv # spurious wakeup when nil return c.proxy_err_response(502, self, 'upstream EOF (headers)') end # NOT looping here when String # continue reading trickled response headers from upstream case rv = kgio_tryread(0x2000, buf) when String then res = req.headers(@hdr, resbuf << rv) and break when :wait_readable then return rv when nil return c.proxy_err_response(502, self, 'upstream EOF (big headers)') end while true @resbuf = false return c.proxy_response_start(res, resbuf, req, self) when Yahns::WbufCommon # streaming/buffering the response body return c.proxy_response_finish(req, self) end while true # case @resbuf when Array # [ (str|vec), rack.input, chunked? ] send_req_body(req) # returns nil or :wait_writable when String # buffered request header send_req_buf(req) end rescue => e # avoid polluting logs with a giant backtrace when the problem isn't # fixable in code. case e when Errno::ECONNREFUSED, Errno::ECONNRESET, Errno::EPIPE e.set_backtrace([]) end c.proxy_err_response(Yahns::WbufCommon === @resbuf ? nil : 502, self, e) end |