Class: Yahns::HttpClient
- Inherits:
-
Kgio::Socket
- Object
- Kgio::Socket
- Yahns::HttpClient
- Includes:
- HttpResponse
- Defined in:
- lib/yahns/http_client.rb
Overview
:nodoc:
Constant Summary collapse
Constants included from HttpResponse
Yahns::HttpResponse::CCC_RESPONSE_START, Yahns::HttpResponse::MSG_DONTWAIT, Yahns::HttpResponse::MSG_MORE, Yahns::HttpResponse::MTX
Instance Method Summary collapse
- #app_call(input) ⇒ Object
- #app_hijacked?(env, res) ⇒ Boolean
-
#call ⇒ Object
this is the env callback exposed to the Rack app.
- #do_pread(io, count, offset) ⇒ Object
-
#fill_body(rsize, rbuf) ⇒ Object
returns true if we want to keep looping on this returns :wait_readable/wait_writable/nil to yield back to epoll.
-
#handle_error(e) ⇒ Object
if we get any error, try to write something back to the client assuming we haven’t closed the socket, but don’t get hung up if the socket is already closed or broken.
-
#hijack_cleanup ⇒ Object
allow releasing some memory if rack.hijack is used n.b.
- #input_ready ⇒ Object
-
#kgio_wait_readable(timeout = self.class.client_timeout) ⇒ Object
called automatically by kgio_read.
-
#kgio_wait_writable(timeout = self.class.client_timeout) ⇒ Object
called automatically by kgio_write.
-
#mkinput_preread ⇒ Object
used only with “input_buffering true”.
-
#r100_done ⇒ Object
only called when buffering slow clients returns :wait_readable, :wait_writable, :ignore, or nil for epoll returns true to keep looping inside yahns_step.
-
#read_trailers(rsize, rbuf) ⇒ Object
returns true if we are ready to dispatch the app returns :wait_readable/wait_writable/nil to yield back to epoll.
- #response_hijacked(fn) ⇒ Object
-
#step_write ⇒ Object
use if writes are deferred by buffering, this return value goes to the main epoll/kqueue worker loop returns :wait_readable, :wait_writable, or nil.
- #trysendio(io, offset, count) ⇒ Object (also: #trysendfile)
-
#yahns_init ⇒ Object
called from acceptor thread.
-
#yahns_read(bytes, buf) ⇒ Object
used by StreamInput (and thus TeeInput) for input_buffering false|:lazy.
-
#yahns_step ⇒ Object
the main entry point of the epoll/kqueue worker loop.
Methods included from HttpResponse
#chunk_out, #do_ccc, #err_response, #http_100_response, #http_response_done, #http_response_prep, #http_response_write, #httpdate, #kgio_syssend, #kv_str, #proxy_busy_mod, #proxy_err_response, #proxy_read_body, #proxy_read_trailers, #proxy_res_headers, #proxy_response_finish, #proxy_response_start, #proxy_unbuffer, #proxy_wait_next, #proxy_write, #response_header_blocked, #response_start, #response_wait_write, #trailer_out, #wait_on_upstream, #wbuf_alloc, #wbuf_maybe
Instance Method Details
#app_call(input) ⇒ Object
200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 |
# File 'lib/yahns/http_client.rb', line 200 def app_call(input) env = @hs.env k = self.class # input is nil if we needed to wait for writability with # check_client_connection if input env['REMOTE_ADDR'] = @kgio_addr env['rack.hijack'] = self env['rack.input'] = input if k.check_client_connection && @hs.headers? rv = do_ccc and return rv end end env.merge!(k.app_defaults) # workaround stupid unicorn_http parser behavior when it parses HTTP_HOST if env['HTTPS'] == 'on'.freeze && env['HTTP_HOST'] && env['SERVER_PORT'] == '80'.freeze env['SERVER_PORT'] = '443'.freeze end opt = http_response_prep(env) # run the rack app res = k.app.call(env) return :ignore if app_hijacked?(env, res) if res[0].to_i == 100 rv = http_100_response(env) and return rv res = k.app.call(env) end # this returns :wait_readable, :wait_writable, :ignore, or nil: http_response_write(res, opt) end |
#app_hijacked?(env, res) ⇒ Boolean
319 320 321 322 323 |
# File 'lib/yahns/http_client.rb', line 319 def app_hijacked?(env, res) return false unless env.include?('rack.hijack_io'.freeze) res[2].close if res && res[2].respond_to?(:close) true end |
#call ⇒ Object
this is the env callback exposed to the Rack app
272 273 274 275 |
# File 'lib/yahns/http_client.rb', line 272 def call hijack_cleanup @hs.env['rack.hijack_io'] = self end |
#do_pread(io, count, offset) ⇒ Object
325 326 327 328 329 330 331 332 333 334 335 336 337 |
# File 'lib/yahns/http_client.rb', line 325 def do_pread(io, count, offset) count = 0x4000 if count > 0x4000 buf = Thread.current[:yahns_sfbuf] ||= ''.dup if io.respond_to?(:pread) io.pread(count, offset, buf) else io.pos = offset io.read(count, buf) end rescue EOFError warn "BUG: do_pread overreach:\n #{caller.join("\n ")}\n" nil end |
#fill_body(rsize, rbuf) ⇒ Object
returns true if we want to keep looping on this returns :wait_readable/wait_writable/nil to yield back to epoll
82 83 84 85 86 87 88 89 90 91 92 93 |
# File 'lib/yahns/http_client.rb', line 82 def fill_body(rsize, rbuf) case rv = kgio_tryread(rsize, rbuf) when String @hs.filter_body(rbuf, @hs.buf << rbuf) @input.write(rbuf) true # keep looping on kgio_tryread (but check body_eof? first) when :wait_readable, :wait_writable rv # have epoll/kqueue wait for more when nil # unexpected EOF @input.close # nil end end |
#handle_error(e) ⇒ Object
if we get any error, try to write something back to the client assuming we haven’t closed the socket, but don’t get hung up if the socket is already closed or broken. We’ll always return nil to ensure the socket is closed at the end of this function
287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 |
# File 'lib/yahns/http_client.rb', line 287 def handle_error(e) code = case e when EOFError,Errno::ECONNRESET,Errno::EPIPE,Errno::ENOTCONN, Errno::ETIMEDOUT,Errno::EHOSTUNREACH return # don't send response, drop the connection when Yahns::ClientTimeout 408 when Unicorn::RequestURITooLongError 414 when Unicorn::RequestEntityTooLargeError 413 when Unicorn::HttpParserError # try to tell the client they're bad 400 else n = 500 case e.class.to_s when 'OpenSSL::SSL::SSLError' if e..include?('wrong version number') n = nil e.set_backtrace([]) end end Yahns::Log.exception(@hs.env["rack.logger"], "app error", e) n end kgio_trywrite(err_response(code)) if code rescue ensure shutdown rescue nil return # always drop the connection on uncaught errors end |
#hijack_cleanup ⇒ Object
allow releasing some memory if rack.hijack is used n.b. we no longer issue EPOLL_CTL_DEL because it becomes more expensive (and complicated) as our hijack support will allow “un-hijacking” the socket.
264 265 266 267 268 269 |
# File 'lib/yahns/http_client.rb', line 264 def hijack_cleanup # prevent socket from holding process exit up Thread.current[:yahns_fdmap].forget(self) @state = :ignore @input = nil # keep env["rack.input"] accessible, though end |
#input_ready ⇒ Object
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 |
# File 'lib/yahns/http_client.rb', line 62 def input_ready empty_body = 0 == @hs.content_length k = self.class case k.input_buffering when true rv = http_100_response(@hs.env) and return rv # common case is an empty body return NULL_IO if empty_body # content_length is nil (chunked) or len > 0 mkinput_preread # keep looping false else # :lazy, false empty_body ? NULL_IO : (@input = k.mkinput(self, @hs)) end end |
#kgio_wait_readable(timeout = self.class.client_timeout) ⇒ Object
called automatically by kgio_read
244 245 246 |
# File 'lib/yahns/http_client.rb', line 244 def kgio_wait_readable(timeout = self.class.client_timeout) super timeout end |
#kgio_wait_writable(timeout = self.class.client_timeout) ⇒ Object
called automatically by kgio_write
239 240 241 |
# File 'lib/yahns/http_client.rb', line 239 def kgio_wait_writable(timeout = self.class.client_timeout) super timeout end |
#mkinput_preread ⇒ Object
used only with “input_buffering true”
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 |
# File 'lib/yahns/http_client.rb', line 46 def mkinput_preread k = self.class len = @hs.content_length mbs = k.client_max_body_size if mbs && len && len > mbs raise Unicorn::RequestEntityTooLargeError, "Content-Length:#{len} too large (>#{mbs})", [] end @state = :body @input = k.tmpio_for(len, @hs.env) rbuf = Thread.current[:yahns_rbuf] @hs.filter_body(rbuf, @hs.buf) @input.write(rbuf) end |
#r100_done ⇒ Object
only called when buffering slow clients returns :wait_readable, :wait_writable, :ignore, or nil for epoll returns true to keep looping inside yahns_step
180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 |
# File 'lib/yahns/http_client.rb', line 180 def r100_done k = self.class case k.input_buffering when true empty_body = 0 == @hs.content_length # common case is an empty body return app_call(NULL_IO) if empty_body # content_length is nil (chunked) or len > 0 mkinput_preread # keep looping (@state == :body) true else # :lazy, false env = @hs.env opt = http_response_prep(env) res = k.app.call(env) return :ignore if app_hijacked?(env, res) http_response_write(res, opt) end end |
#read_trailers(rsize, rbuf) ⇒ Object
returns true if we are ready to dispatch the app returns :wait_readable/wait_writable/nil to yield back to epoll
97 98 99 100 101 102 103 104 105 106 107 108 109 110 |
# File 'lib/yahns/http_client.rb', line 97 def read_trailers(rsize, rbuf) case rv = kgio_tryread(rsize, rbuf) when String if @hs.add_parse(rbuf) @input.rewind return true end # keep looping on kgio_tryread... when :wait_readable, :wait_writable return rv # wait for more when nil # unexpected EOF return @input.close # nil end while true end |
#response_hijacked(fn) ⇒ Object
277 278 279 280 281 |
# File 'lib/yahns/http_client.rb', line 277 def response_hijacked(fn) hijack_cleanup fn.call(self) :ignore end |
#step_write ⇒ Object
use if writes are deferred by buffering, this return value goes to the main epoll/kqueue worker loop returns :wait_readable, :wait_writable, or nil
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
# File 'lib/yahns/http_client.rb', line 27 def step_write case rv = @state.wbuf_flush(self) when :wait_writable, :wait_readable return rv # tell epoll/kqueue to wait on this more when :ignore # :ignore on hijack, @state already set in hijack_cleanup return :ignore when Yahns::StreamFile @state = rv # continue looping when true, false # done return http_response_done(rv) when :ccc_done, :r100_done @state = rv return :wait_writable else raise "BUG: #{@state.inspect}#wbuf_flush returned #{rv.inspect}" end while true end |
#trysendio(io, offset, count) ⇒ Object Also known as: trysendfile
339 340 341 342 343 344 345 346 347 348 349 350 351 352 |
# File 'lib/yahns/http_client.rb', line 339 def trysendio(io, offset, count) return 0 if count == 0 str = do_pread(io, count, offset) or return # nil for EOF n = 0 case rv = kgio_trywrite(str) when String # partial write, keep trying n += (str.size - rv.size) str = rv when :wait_writable, :wait_readable return n > 0 ? n : rv when nil return n + str.size # yay! end while true end |
#yahns_init ⇒ Object
called from acceptor thread
18 19 20 21 22 |
# File 'lib/yahns/http_client.rb', line 18 def yahns_init @hs = Unicorn::HttpRequest.new @state = :headers # :body, :trailers, :pipelined, Wbuf, StreamFile @input = nil end |
#yahns_read(bytes, buf) ⇒ Object
used by StreamInput (and thus TeeInput) for input_buffering false|:lazy
249 250 251 252 253 254 255 256 257 258 |
# File 'lib/yahns/http_client.rb', line 249 def yahns_read(bytes, buf) case rv = kgio_tryread(bytes, buf) when String, nil return rv when :wait_readable kgio_wait_readable or raise Yahns::ClientTimeout, "waiting for read", [] when :wait_writable kgio_wait_writable or raise Yahns::ClientTimeout, "waiting for write", [] end while true end |
#yahns_step ⇒ Object
the main entry point of the epoll/kqueue worker loop
113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 |
# File 'lib/yahns/http_client.rb', line 113 def yahns_step # always write unwritten data first if we have any return step_write if Yahns::WbufCommon === @state # only read if we had nothing to write in this event loop iteration k = self.class rbuf = Thread.current[:yahns_rbuf] # running under spawn_worker_threads case @state when :pipelined if @hs.parse case input = input_ready when :wait_readable, :wait_writable, :close then return input when false # keep looping on @state else return app_call(input) end # @state == :body if we get here point (input_ready -> mkinput_preread) else @state = :headers end # continue to outer loop when :headers case rv = kgio_tryread(k.client_header_buffer_size, rbuf) when String if @hs.add_parse(rv) case input = input_ready when :wait_readable, :wait_writable, :close then return input when false then break # to outer loop to reevaluate @state == :body else return app_call(input) end end # keep looping on kgio_tryread when :wait_readable, :wait_writable, nil return rv end while true when :body if @hs.body_eof? if @hs.content_length || @hs.parse # hp.parse == trailers done! @input.rewind return app_call(@input) else # possible Transfer-Encoding:chunked, keep looping @state = :trailers end else rv = fill_body(k.client_body_buffer_size, rbuf) return rv unless true == rv end when :trailers rv = read_trailers(k.client_header_buffer_size, rbuf) return true == rv ? app_call(@input) : rv when :ccc_done # unlikely return app_call(nil) when :r100_done # unlikely rv = r100_done return rv unless rv == true raise "BUG: body=#@state " if @state != :body # @state == :body, keep looping end while true # outer loop rescue => e handle_error(e) end |