Module: Yahns::HttpResponse
- Includes:
- Unicorn::HttpResponse
- Included in:
- HttpClient
- Defined in:
- lib/yahns/http_response.rb,
lib/yahns/proxy_http_response.rb
Overview
loaded by yahns/proxy_pass, this relies on Yahns::HttpResponse for constants.
Constant Summary collapse
- MTX =
Mutex.new
- CCC_RESPONSE_START =
avoid GC overhead for frequently used-strings/objects:
[ 'HTTP', '/1.1 ' ]
- MSG_MORE =
0
- MSG_DONTWAIT =
0
Instance Method Summary collapse
-
#chunk_out(buf) ⇒ Object
n.b.: we can use String#size for optimized dispatch under YARV instead of String#bytesize because all the IO read methods return a binary string when given a maximum read length.
-
#do_ccc ⇒ Object
returns nil on success :wait_readable/:wait_writable/:close for epoll.
- #err_response(code) ⇒ Object
-
#http_100_response(env) ⇒ Object
only used if input_buffering is true (not :lazy or false) input_buffering==:lazy/false gives control to the app returns nil on success returns :close, :wait_writable, or :wait_readable.
- #http_response_done(alive) ⇒ Object
-
#http_response_prep(env) ⇒ Object
must be called before app dispatch, since the app can do all sorts of nasty things to env.
-
#http_response_write(res, opt) ⇒ Object
writes the rack_response to socket as an HTTP response returns :wait_readable, :wait_writable, :forget, or nil.
- #httpdate ⇒ Object
- #kgio_syssend(buf, flags) ⇒ Object
- #kv_str(buf, key, value) ⇒ Object
- #proxy_busy_mod(wbuf, req_res) ⇒ Object
- #proxy_err_response(code, req_res, exc) ⇒ Object
- #proxy_read_body(tip, kcar, req_res) ⇒ Object
- #proxy_read_trailers(kcar, req_res) ⇒ Object
- #proxy_res_headers(res, req_res) ⇒ Object
- #proxy_response_finish(kcar, req_res) ⇒ Object
-
#proxy_response_start(res, tip, kcar, req_res) ⇒ Object
start streaming the response once upstream is done sending headers to us.
-
#proxy_unbuffer(wbuf, nxt = :ignore) ⇒ Object
switch and yield.
- #proxy_wait_next(qflags) ⇒ Object
-
#proxy_write(wbuf, buf, req_res) ⇒ Object
write everything in buf to our client socket (or wbuf, if it exists) it may return a newly-created wbuf or nil.
- #response_header_blocked(header, body, alive, offset, count) ⇒ Object
- #response_start ⇒ Object
- #response_wait_write(rv) ⇒ Object
- #trailer_out(tlr) ⇒ Object
- #wait_on_upstream(req_res) ⇒ Object
- #wbuf_alloc(req_res) ⇒ Object
- #wbuf_maybe(wbuf, rv) ⇒ Object
Instance Method Details
#chunk_out(buf) ⇒ Object
n.b.: we can use String#size for optimized dispatch under YARV instead of String#bytesize because all the IO read methods return a binary string when given a maximum read length
298 299 300 |
# File 'lib/yahns/proxy_http_response.rb', line 298 def chunk_out(buf) [ "#{buf.size.to_s(16)}\r\n", buf, "\r\n".freeze ] end |
#do_ccc ⇒ Object
returns nil on success :wait_readable/:wait_writable/:close for epoll
239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 |
# File 'lib/yahns/http_response.rb', line 239 def do_ccc @hs.response_start_sent = true wbuf = nil rv = nil CCC_RESPONSE_START.each do |buf| if wbuf wbuf << buf else case rv = kgio_trywrite(buf) when nil break when String buf = rv when :wait_writable, :wait_readable if self.class.output_buffering wbuf = buf.dup @state = Yahns::WbufStr.new(wbuf, :ccc_done) break else response_wait_write(rv) or return :close end end while true end end rv end |
#err_response(code) ⇒ Object
58 59 60 |
# File 'lib/yahns/http_response.rb', line 58 def err_response(code) "#{response_start}#{code} #{Rack::Utils::HTTP_STATUS_CODES[code]}\r\n\r\n" end |
#http_100_response(env) ⇒ Object
only used if input_buffering is true (not :lazy or false) input_buffering==:lazy/false gives control to the app returns nil on success returns :close, :wait_writable, or :wait_readable
270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 |
# File 'lib/yahns/http_response.rb', line 270 def http_100_response(env) env.delete('HTTP_EXPECT'.freeze) =~ /\A100-continue\z/i or return buf = @hs.response_start_sent ? "100 Continue\r\n\r\nHTTP/1.1 ".freeze : "HTTP/1.1 100 Continue\r\n\r\n".freeze case rv = kgio_trywrite(buf) when String buf = rv when :wait_writable, :wait_readable if self.class.output_buffering @state = Yahns::WbufStr.new(buf, :r100_done) return rv else response_wait_write(rv) or return :close end else return rv end while true end |
#http_response_done(alive) ⇒ Object
93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 |
# File 'lib/yahns/http_response.rb', line 93 def http_response_done(alive) @input = @input.close if @input if alive # @hs.buf will have data if the client pipelined if @hs.buf.empty? @state = :headers :wait_readable else @state = :pipelined # we shouldn't start processing the application again until we know # the socket is writable for the response :wait_writable end else # shutdown is needed in case the app forked, we rescue here since # StreamInput may issue shutdown as well shutdown rescue nil :close end end |
#http_response_prep(env) ⇒ Object
must be called before app dispatch, since the app can do all sorts of nasty things to env
292 293 294 295 |
# File 'lib/yahns/http_response.rb', line 292 def http_response_prep(env) [ env['REQUEST_METHOD'] == 'HEAD'.freeze, # hdr_only env['HTTP_VERSION'] == 'HTTP/1.1'.freeze ] # chunk_ok end |
#http_response_write(res, opt) ⇒ Object
writes the rack_response to socket as an HTTP response returns :wait_readable, :wait_writable, :forget, or nil
125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 |
# File 'lib/yahns/http_response.rb', line 125 def http_response_write(res, opt) status, headers, body = res offset = 0 count = hijack = clen = nil alive = @hs.next? && self.class.persistent_connections flags = MSG_DONTWAIT term = false hdr_only, chunk_ok = opt code = status.to_i hdr_only ||= Rack::Utils::STATUS_WITH_NO_ENTITY_BODY.include?(code) msg = Rack::Utils::HTTP_STATUS_CODES[code] buf = "#{response_start}#{msg ? %Q(#{code} #{msg}) : status}\r\n" \ "Date: #{httpdate}\r\n".dup headers.each do |key, value| case key when %r{\ADate\z}i next when %r{\AContent-Range\z}i if %r{\Abytes (\d+)-(\d+)/\d+\z} =~ value offset = $1.to_i count = $2.to_i - offset + 1 end kv_str(buf, key, value) when %r{\AConnection\z}i # allow Rack apps to tell us they want to drop the client alive = false if value =~ /\bclose\b/i when %r{\AContent-Length\z}i term = true clen = value.to_i flags |= MSG_MORE if clen > 0 && !hdr_only kv_str(buf, key, value) when %r{\ATransfer-Encoding\z}i term = true if value =~ /\bchunked\b/i kv_str(buf, key, value) when "rack.hijack" hijack = value else kv_str(buf, key, value) end end count ||= clen if !term && chunk_ok && !hdr_only term = true body = Yahns::ChunkBody.new(body, opt) buf << "Transfer-Encoding: chunked\r\n".freeze end alive &&= (term || hdr_only) buf << (alive ? "Connection: keep-alive\r\n\r\n".freeze : "Connection: close\r\n\r\n".freeze) case rv = kgio_syssend(buf, flags) when nil # all done, likely buf.clear buf = nil # recycle any memory we used ASAP break when String flags = MSG_DONTWAIT buf = rv # unlikely, hope the skb grows when :wait_writable, :wait_readable # unlikely if self.class.output_buffering alive = hijack ? hijack : alive rv = response_header_blocked(buf, body, alive, offset, count) body = nil # ensure we do not close body in ensure return rv else response_wait_write(rv) or return :close end end while @hs.headers? return response_hijacked(hijack) if hijack return http_response_done(alive) if hdr_only if body.respond_to?(:to_path) && count @state = body = Yahns::StreamFile.new(body, alive, offset, count) return step_write end headers = wbuf = rv = nil body.each do |x| if wbuf rv = wbuf.wbuf_write(self, x) else case rv = String === x ? kgio_trywrite(x) : kgio_trywritev(x) when nil # all done, likely and good! break when String, Array x = rv # hope the skb grows when we loop into the trywrite when :wait_writable, :wait_readable if self.class.output_buffering wbuf = Yahns::Wbuf.new(body, alive) rv = wbuf.wbuf_write(self, x) break else response_wait_write(rv) or return :close end end while true end end # if we buffered the write body, we must return :wait_writable # (or :wait_readable for SSL) and hit Yahns::HttpClient#step_write if wbuf body = nil # ensure we do not close the body in ensure wbuf_maybe(wbuf, rv) else http_response_done(alive) end ensure body.respond_to?(:close) and body.close end |
#httpdate ⇒ Object
23 24 25 |
# File 'lib/yahns/http_response.rb', line 23 def httpdate MTX.synchronize { super } end |
#kgio_syssend(buf, flags) ⇒ Object
40 41 42 |
# File 'lib/yahns/http_response.rb', line 40 def kgio_syssend(buf, flags) kgio_trywrite(buf) end |
#kv_str(buf, key, value) ⇒ Object
114 115 116 117 118 119 120 121 |
# File 'lib/yahns/http_response.rb', line 114 def kv_str(buf, key, value) if value.include?("\n".freeze) # avoiding blank, key-only cookies with /\n+/ value.split(/\n+/).each { |v| buf << "#{key}: #{v}\r\n" } else buf << "#{key}: #{value}\r\n" end end |
#proxy_busy_mod(wbuf, req_res) ⇒ Object
275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 |
# File 'lib/yahns/proxy_http_response.rb', line 275 def proxy_busy_mod(wbuf, req_res) if wbuf # we are completely done reading and buffering the upstream response, # but have not completely written the response to the client, # yield control to the client socket: @state = wbuf proxy_wait_next(wbuf.busy == :wait_readable ? Yahns::Queue::QEV_RD : Yahns::Queue::QEV_WR) # no touching self after proxy_wait_next, we may be running # HttpClient#yahns_step in a different thread at this point else case http_response_done(req_res.alive) when :wait_readable then proxy_wait_next(Yahns::Queue::QEV_RD) when :wait_writable then proxy_wait_next(Yahns::Queue::QEV_WR) when :close then close end end nil # signal close for ReqRes#yahns_step end |
#proxy_err_response(code, req_res, exc) ⇒ Object
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/yahns/proxy_http_response.rb', line 48 def proxy_err_response(code, req_res, exc) logger = self.class.logger # Yahns::HttpContext#logger case exc when nil logger.error('premature upstream EOF') when Kcar::ParserError logger.error("upstream response error: #{exc.}") when String logger.error(exc) else Yahns::Log.exception(logger, 'upstream error', exc) end # try to write something, but don't care if we fail Integer === code and kgio_trywrite("HTTP/1.1 #{code} #{ Rack::Utils::HTTP_STATUS_CODES[code]}\r\n\r\n") rescue nil shutdown rescue nil @input = @input.close if @input # this is safe ONLY because we are in an :ignore state after # Fdmap#forget when we got hijacked: close nil # signal close of req_res from yahns_step in yahns/proxy_pass.rb ensure wbuf = req_res.resbuf wbuf.wbuf_abort if wbuf.respond_to?(:wbuf_abort) end |
#proxy_read_body(tip, kcar, req_res) ⇒ Object
149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 |
# File 'lib/yahns/proxy_http_response.rb', line 149 def proxy_read_body(tip, kcar, req_res) chunk = ''.dup if kcar.chunked? len = kcar.body_bytes_left rbuf = Thread.current[:yahns_rbuf] alive = req_res.alive wbuf = req_res.resbuf case tmp = tip.shift || req_res.kgio_tryread(0x2000, rbuf) when String if len kcar.body_bytes_left -= tmp.size # progress for body_eof? => true elsif chunk kcar.filter_body(chunk, rbuf = tmp) # progress for body_eof? => true next if chunk.empty? # call req_res.kgio_tryread for more tmp = chunk_out(chunk) elsif alive # HTTP/1.0 upstream, HTTP/1.1 client tmp = chunk_out(tmp) # else # HTTP/1.0 upstream, HTTP/1.0 client, do nothing end wbuf = proxy_write(wbuf, tmp, req_res) chunk.clear if chunk if Yahns::WbufLite === wbuf req_res.proxy_trailers = [ rbuf.dup, tip ] if chunk && kcar.body_eof? return proxy_unbuffer(wbuf) end when nil # EOF # HTTP/1.1 upstream, unexpected premature EOF: msg = "upstream EOF (#{len} bytes left)" if len msg = 'upstream EOF (chunk)' if chunk return proxy_err_response(nil, req_res, msg) if msg # HTTP/1.0 upstream: wbuf = proxy_write(wbuf, "0\r\n\r\n".freeze, req_res) if alive req_res.shutdown return proxy_unbuffer(wbuf, nil) if Yahns::WbufLite === wbuf return proxy_busy_mod(wbuf, req_res) when :wait_readable return wait_on_upstream(req_res) end until kcar.body_eof? if chunk # tip is an empty array and becomes trailer storage req_res.proxy_trailers = [ rbuf.dup, tip ] return proxy_read_trailers(kcar, req_res) end proxy_busy_mod(wbuf, req_res) end |
#proxy_read_trailers(kcar, req_res) ⇒ Object
197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 |
# File 'lib/yahns/proxy_http_response.rb', line 197 def proxy_read_trailers(kcar, req_res) chunk, tlr = req_res.proxy_trailers rbuf = Thread.current[:yahns_rbuf] wbuf = req_res.resbuf until kcar.trailers(tlr, chunk) case rv = req_res.kgio_tryread(0x2000, rbuf) when String chunk << rv when :wait_readable return wait_on_upstream(req_res) when nil # premature EOF return proxy_err_response(nil, req_res, 'upstream EOF (trailers)') end # no loop here end wbuf = proxy_write(wbuf, trailer_out(tlr), req_res) return proxy_unbuffer(wbuf) if Yahns::WbufLite === wbuf proxy_busy_mod(wbuf, req_res) end |
#proxy_res_headers(res, req_res) ⇒ Object
83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 |
# File 'lib/yahns/proxy_http_response.rb', line 83 def proxy_res_headers(res, req_res) status, headers = res code = status.to_i msg = Rack::Utils::HTTP_STATUS_CODES[code] env = @hs.env have_body = !Rack::Utils::STATUS_WITH_NO_ENTITY_BODY.include?(code) && env['REQUEST_METHOD'] != 'HEAD'.freeze flags = MSG_DONTWAIT alive = @hs.next? && self.class.persistent_connections term = false response_headers = req_res.proxy_pass.response_headers res = "HTTP/1.1 #{msg ? %Q(#{code} #{msg}) : status}\r\n".dup headers.each do |key,value| # n.b.: headers is an Array of 2-element Arrays case key when /\A(?:Connection|Keep-Alive)\z/i next # do not let some upstream headers leak through when %r{\AContent-Length\z}i term = true flags |= MSG_MORE if have_body && value.to_i > 0 when %r{\ATransfer-Encoding\z}i term = true if value =~ /\bchunked\b/i end # response header mapping case val = response_headers[key] when :ignore next when String value = val end res << "#{key}: #{value}\r\n" end # For now, do not add a Date: header, assume upstream already did it # but do not care if they did not # chunk the response ourselves if the client supports it, # but the backend does not terminate properly if alive && ! term && have_body if env['HTTP_VERSION'] == 'HTTP/1.1'.freeze res << "Transfer-Encoding: chunked\r\n".freeze else # we can't persist HTTP/1.0 and HTTP/0.9 w/o Content-Length alive = false end end res << (alive ? "Connection: keep-alive\r\n\r\n".freeze : "Connection: close\r\n\r\n".freeze) # send the headers case rv = kgio_syssend(res, flags) when nil # all done, likely res.clear break when String # partial write, highly unlikely flags = MSG_DONTWAIT res = rv # hope the skb grows when :wait_writable, :wait_readable # highly unlikely in real apps proxy_write(nil, res, req_res) break # keep buffering body... end while true req_res.alive = alive have_body end |
#proxy_response_finish(kcar, req_res) ⇒ Object
239 240 241 242 |
# File 'lib/yahns/proxy_http_response.rb', line 239 def proxy_response_finish(kcar, req_res) req_res.proxy_trailers ? proxy_read_trailers(kcar, req_res) : proxy_read_body([], kcar, req_res) end |
#proxy_response_start(res, tip, kcar, req_res) ⇒ Object
start streaming the response once upstream is done sending headers to us. returns :wait_readable if we need to read more from req_res returns :ignore if we yield control to the client(self) returns nil if completely done
221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 |
# File 'lib/yahns/proxy_http_response.rb', line 221 def proxy_response_start(res, tip, kcar, req_res) have_body = proxy_res_headers(res, req_res) tip = tip.empty? ? [] : [ tip ] if have_body req_res.proxy_trailers = nil # define to avoid uninitialized warnings return proxy_read_body(tip, kcar, req_res) end # unlikely wbuf = req_res.resbuf return proxy_unbuffer(wbuf) if Yahns::WbufLite === wbuf # all done reading response from upstream, req_res will be discarded # when we return nil: proxy_busy_mod(wbuf, req_res) end |
#proxy_unbuffer(wbuf, nxt = :ignore) ⇒ Object
switch and yield
13 14 15 16 17 18 19 |
# File 'lib/yahns/proxy_http_response.rb', line 13 def proxy_unbuffer(wbuf, nxt = :ignore) @state = wbuf wbuf.req_res = nil if nxt.nil? && wbuf.respond_to?(:req_res=) proxy_wait_next(wbuf.busy == :wait_readable ? Yahns::Queue::QEV_RD : Yahns::Queue::QEV_WR) nxt end |
#proxy_wait_next(qflags) ⇒ Object
244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 |
# File 'lib/yahns/proxy_http_response.rb', line 244 def proxy_wait_next(qflags) Thread.current[:yahns_fdmap].remember(self) # We must allocate a new, empty request object here to avoid a TOCTTOU # in the following timeline # # original thread: | another thread # HttpClient#yahns_step | # r = k.app.call(env = @hs.env) # socket hijacked into epoll queue # <thread is scheduled away> | epoll_wait readiness # | ReqRes#yahns_step # | proxy dispatch ... # | proxy_busy_mod # ************************** DANGER BELOW ******************************** # | HttpClient#yahns_step # | # clears env # sees empty env: | # return :ignore if env.include?('rack.hijack_io') | # # In other words, we cannot touch the original env seen by the # original thread since it must see the 'rack.hijack_io' value # because both are operating in the same Yahns::HttpClient object. # This will happen regardless of GVL existence hs = Unicorn::HttpRequest.new hs.buf.replace(@hs.buf) @hs = hs # n.b. we may not touch anything in this object once we call queue_mod, # another thread is likely to take it! Thread.current[:yahns_queue].queue_mod(self, qflags) end |
#proxy_write(wbuf, buf, req_res) ⇒ Object
write everything in buf to our client socket (or wbuf, if it exists) it may return a newly-created wbuf or nil
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
# File 'lib/yahns/proxy_http_response.rb', line 31 def proxy_write(wbuf, buf, req_res) unless wbuf # no write buffer, try to write directly to the client socket case rv = String === buf ? kgio_trywrite(buf) : kgio_trywritev(buf) when nil then return # done writing buf, likely when String, Array # partial write, hope the skb grows buf = rv when :wait_writable, :wait_readable wbuf = req_res.resbuf ||= wbuf_alloc(req_res) break end while true end wbuf.wbuf_write(self, buf) wbuf.busy ? wbuf : nil end |
#response_header_blocked(header, body, alive, offset, count) ⇒ Object
62 63 64 65 66 67 68 69 70 71 72 73 |
# File 'lib/yahns/http_response.rb', line 62 def response_header_blocked(header, body, alive, offset, count) if body.respond_to?(:to_path) && count alive = Yahns::StreamFile.new(body, alive, offset, count) body = nil end wbuf = Yahns::Wbuf.new(body, alive) rv = wbuf.wbuf_write(self, header) if body && ! alive.respond_to?(:call) # skip body.each if hijacked body.each { |chunk| rv = wbuf.wbuf_write(self, chunk) } end wbuf_maybe(wbuf, rv) end |
#response_start ⇒ Object
45 46 47 |
# File 'lib/yahns/http_response.rb', line 45 def response_start @hs.response_start_sent ? ''.freeze : 'HTTP/1.1 '.freeze end |
#response_wait_write(rv) ⇒ Object
49 50 51 52 53 54 55 56 |
# File 'lib/yahns/http_response.rb', line 49 def response_wait_write(rv) # call the kgio_wait_readable or kgio_wait_writable method ok = __send__("kgio_#{rv}") and return ok k = self.class k.logger.info("fd=#{fileno} ip=#@kgio_addr timeout on :#{rv} after "\ "#{k.client_timeout}s") false end |
#trailer_out(tlr) ⇒ Object
302 303 304 |
# File 'lib/yahns/proxy_http_response.rb', line 302 def trailer_out(tlr) "0\r\n#{tlr.map! do |k,v| "#{k}: #{v}\r\n" end.join}\r\n" end |
#wait_on_upstream(req_res) ⇒ Object
78 79 80 81 |
# File 'lib/yahns/proxy_http_response.rb', line 78 def wait_on_upstream(req_res) req_res.resbuf ||= wbuf_alloc(req_res) :wait_readable # self remains in :ignore, wait on upstream end |
#wbuf_alloc(req_res) ⇒ Object
21 22 23 24 25 26 27 |
# File 'lib/yahns/proxy_http_response.rb', line 21 def wbuf_alloc(req_res) if req_res.proxy_pass.proxy_buffering Yahns::Wbuf.new(nil, req_res.alive) else Yahns::WbufLite.new(req_res) end end |
#wbuf_maybe(wbuf, rv) ⇒ Object
75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 |
# File 'lib/yahns/http_response.rb', line 75 def wbuf_maybe(wbuf, rv) case rv # wbuf_write return value when nil # all done case rv = wbuf.wbuf_close(self) when :ignore # hijacked @state = rv when Yahns::StreamFile @state = rv :wait_writable when true, false http_response_done(rv) end else @state = wbuf rv end end |