Class: Yahns::HttpClient
- Inherits:
-
Kgio::Socket
- Object
- Kgio::Socket
- Yahns::HttpClient
- Includes:
- HttpResponse
- Defined in:
- lib/yahns/http_client.rb
Overview
Copyright © 2013, Eric Wong <[email protected]> and all contributors License: GPLv3 or later (www.gnu.org/licenses/gpl-3.0.txt)
Constant Summary collapse
- NULL_IO =
:nodoc:
StringIO.new("")
- QEV_FLAGS =
used by acceptor
Yahns::Queue::QEV_RD
- REMOTE_ADDR =
A frozen format for this is about 15% faster (note from Mongrel)
'REMOTE_ADDR'.freeze
- RACK_INPUT =
'rack.input'.freeze
- RACK_HIJACK =
'rack.hijack'.freeze
- RACK_HIJACK_IO =
"rack.hijack_io".freeze
Constants included from HttpResponse
Yahns::HttpResponse::CCC_RESPONSE_START, Yahns::HttpResponse::CONN_CLOSE, Yahns::HttpResponse::CONN_KA, Yahns::HttpResponse::HEAD, Yahns::HttpResponse::MSG_DONTWAIT, Yahns::HttpResponse::MSG_MORE, Yahns::HttpResponse::MTX, Yahns::HttpResponse::REQUEST_METHOD, Yahns::HttpResponse::RESPONSE_START, Yahns::HttpResponse::Z
Instance Method Summary collapse
- #app_call(input) ⇒ Object
-
#call ⇒ Object
this is the env callback exposed to the Rack app.
-
#fill_body(rsize, rbuf) ⇒ Object
returns true if we want to keep looping on this returns :wait_readable/wait_writable/nil to yield back to epoll.
-
#handle_error(e) ⇒ Object
if we get any error, try to write something back to the client assuming we haven’t closed the socket, but don’t get hung up if the socket is already closed or broken.
-
#hijack_cleanup ⇒ Object
allow releasing some memory if rack.hijack is used n.b.
- #input_ready ⇒ Object
-
#kgio_wait_readable(timeout = self.class.client_timeout) ⇒ Object
called automatically by kgio_read.
-
#kgio_wait_writable(timeout = self.class.client_timeout) ⇒ Object
called automatically by kgio_write.
-
#mkinput_preread ⇒ Object
used only with “input_buffering true”.
-
#r100_done ⇒ Object
only called when buffering slow clients returns :wait_readable, :wait_writable, :ignore, or nil for epoll returns true to keep looping inside yahns_step.
-
#read_trailers(rsize, rbuf) ⇒ Object
returns true if we are ready to dispatch the app returns :wait_readable/wait_writable/nil to yield back to epoll.
- #response_hijacked(fn) ⇒ Object
-
#step_write ⇒ Object
use if writes are deferred by buffering, this return value goes to the main epoll/kqueue worker loop returns :wait_readable, :wait_writable, or nil.
-
#yahns_init ⇒ Object
called from acceptor thread.
-
#yahns_read(bytes, buf) ⇒ Object
used by StreamInput (and thus TeeInput) for input_buffering false|:lazy.
-
#yahns_step ⇒ Object
the main entry point of the epoll/kqueue worker loop.
Methods included from HttpResponse
#chunk_out, #do_ccc, #err_response, #have_more?, #http_100_response, #http_response_done, #http_response_write, #httpdate, #kgio_syssend, #kv_str, #proxy_busy_mod_blocked, #proxy_busy_mod_done, #proxy_err_response, #proxy_response_finish, #proxy_response_start, #proxy_wait_next, #proxy_write, #response_header_blocked, #response_start, #response_wait_write, #trailer_out, #wait_on_upstream, #wbuf_maybe
Instance Method Details
#app_call(input) ⇒ Object
203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 |
# File 'lib/yahns/http_client.rb', line 203 def app_call(input) env = @hs.env k = self.class # input is nil if we needed to wait for writability with # check_client_connection if input env[REMOTE_ADDR] = @kgio_addr env[RACK_HIJACK] = self env[RACK_INPUT] = input if k.check_client_connection && @hs.headers? rv = do_ccc and return rv end end # run the rack app status, headers, body = k.app.call(env.merge!(k.app_defaults)) return :ignore if env.include?(RACK_HIJACK_IO) if status.to_i == 100 rv = http_100_response(env) and return rv status, headers, body = k.app.call(env) end # this returns :wait_readable, :wait_writable, :ignore, or nil: http_response_write(status, headers, body) end |
#call ⇒ Object
this is the env callback exposed to the Rack app
264 265 266 267 |
# File 'lib/yahns/http_client.rb', line 264 def call hijack_cleanup @hs.env[RACK_HIJACK_IO] = self end |
#fill_body(rsize, rbuf) ⇒ Object
returns true if we want to keep looping on this returns :wait_readable/wait_writable/nil to yield back to epoll
87 88 89 90 91 92 93 94 95 96 97 98 |
# File 'lib/yahns/http_client.rb', line 87 def fill_body(rsize, rbuf) case rv = kgio_tryread(rsize, rbuf) when String @hs.filter_body(rbuf, @hs.buf << rbuf) @input.write(rbuf) true # keep looping on kgio_tryread (but check body_eof? first) when :wait_readable, :wait_writable rv # have epoll/kqueue wait for more when nil # unexpected EOF @input.close # nil end end |
#handle_error(e) ⇒ Object
if we get any error, try to write something back to the client assuming we haven’t closed the socket, but don’t get hung up if the socket is already closed or broken. We’ll always return nil to ensure the socket is closed at the end of this function
279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 |
# File 'lib/yahns/http_client.rb', line 279 def handle_error(e) code = case e when EOFError,Errno::ECONNRESET,Errno::EPIPE,Errno::ENOTCONN, Errno::ETIMEDOUT,Errno::EHOSTUNREACH return # don't send response, drop the connection when Yahns::ClientTimeout 408 when Unicorn::RequestURITooLongError 414 when Unicorn::RequestEntityTooLargeError 413 when Unicorn::HttpParserError # try to tell the client they're bad 400 else Yahns::Log.exception(@hs.env["rack.logger"], "app error", e) 500 end kgio_trywrite(err_response(code)) rescue ensure shutdown rescue nil return # always drop the connection on uncaught errors end |
#hijack_cleanup ⇒ Object
allow releasing some memory if rack.hijack is used n.b. we no longer issue EPOLL_CTL_DEL because it becomes more expensive (and complicated) as our hijack support will allow “un-hijacking” the socket.
257 258 259 260 261 |
# File 'lib/yahns/http_client.rb', line 257 def hijack_cleanup # prevent socket from holding process up Thread.current[:yahns_fdmap].forget(self) @input = nil # keep env["rack.input"] accessible, though end |
#input_ready ⇒ Object
67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 |
# File 'lib/yahns/http_client.rb', line 67 def input_ready empty_body = 0 == @hs.content_length k = self.class case k.input_buffering when true rv = http_100_response(@hs.env) and return rv # common case is an empty body return NULL_IO if empty_body # content_length is nil (chunked) or len > 0 mkinput_preread # keep looping false else # :lazy, false empty_body ? NULL_IO : (@input = k.mkinput(self, @hs)) end end |
#kgio_wait_readable(timeout = self.class.client_timeout) ⇒ Object
called automatically by kgio_read
237 238 239 |
# File 'lib/yahns/http_client.rb', line 237 def kgio_wait_readable(timeout = self.class.client_timeout) super timeout end |
#kgio_wait_writable(timeout = self.class.client_timeout) ⇒ Object
called automatically by kgio_write
232 233 234 |
# File 'lib/yahns/http_client.rb', line 232 def kgio_wait_writable(timeout = self.class.client_timeout) super timeout end |
#mkinput_preread ⇒ Object
used only with “input_buffering true”
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 |
# File 'lib/yahns/http_client.rb', line 51 def mkinput_preread k = self.class len = @hs.content_length mbs = k.client_max_body_size if mbs && len && len > mbs raise Unicorn::RequestEntityTooLargeError, "Content-Length:#{len} too large (>#{mbs})", [] end @state = :body @input = k.tmpio_for(len, @hs.env) rbuf = Thread.current[:yahns_rbuf] @hs.filter_body(rbuf, @hs.buf) @input.write(rbuf) end |
#r100_done ⇒ Object
only called when buffering slow clients returns :wait_readable, :wait_writable, :ignore, or nil for epoll returns true to keep looping inside yahns_step
185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 |
# File 'lib/yahns/http_client.rb', line 185 def r100_done k = self.class case k.input_buffering when true empty_body = 0 == @hs.content_length # common case is an empty body return app_call(NULL_IO) if empty_body # content_length is nil (chunked) or len > 0 mkinput_preread # keep looping (@state == :body) true else # :lazy, false r = k.app.call(env = @hs.env) return :ignore if env.include?(RACK_HIJACK_IO) http_response_write(*r) end end |
#read_trailers(rsize, rbuf) ⇒ Object
returns true if we are ready to dispatch the app returns :wait_readable/wait_writable/nil to yield back to epoll
102 103 104 105 106 107 108 109 110 111 112 113 114 115 |
# File 'lib/yahns/http_client.rb', line 102 def read_trailers(rsize, rbuf) case rv = kgio_tryread(rsize, rbuf) when String if @hs.add_parse(rbuf) @input.rewind return true end # keep looping on kgio_tryread... when :wait_readable, :wait_writable return rv # wait for more when nil # unexpected EOF return @input.close # nil end while true end |
#response_hijacked(fn) ⇒ Object
269 270 271 272 273 |
# File 'lib/yahns/http_client.rb', line 269 def response_hijacked(fn) hijack_cleanup fn.call(self) :ignore end |
#step_write ⇒ Object
use if writes are deferred by buffering, this return value goes to the main epoll/kqueue worker loop returns :wait_readable, :wait_writable, or nil
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
# File 'lib/yahns/http_client.rb', line 31 def step_write case rv = @state.wbuf_flush(self) when :wait_writable, :wait_readable return rv # tell epoll/kqueue to wait on this more when :ignore # :ignore on hijack @state = :ignore return :ignore when Yahns::StreamFile @state = rv # continue looping when true, false # done return http_response_done(rv) when :ccc_done, :r100_done @state = rv return :wait_writable else raise "BUG: #{@state.inspect}#wbuf_flush returned #{rv.inspect}" end while true end |
#yahns_init ⇒ Object
called from acceptor thread
21 22 23 24 25 26 |
# File 'lib/yahns/http_client.rb', line 21 def yahns_init @hs = Unicorn::HttpRequest.new @response_start_sent = false @state = :headers # :body, :trailers, :pipelined, Wbuf, StreamFile @input = nil end |
#yahns_read(bytes, buf) ⇒ Object
used by StreamInput (and thus TeeInput) for input_buffering false|:lazy
242 243 244 245 246 247 248 249 250 251 |
# File 'lib/yahns/http_client.rb', line 242 def yahns_read(bytes, buf) case rv = kgio_tryread(bytes, buf) when String, nil return rv when :wait_readable kgio_wait_readable or raise Yahns::ClientTimeout, "waiting for read", [] when :wait_writable kgio_wait_writable or raise Yahns::ClientTimeout, "waiting for write", [] end while true end |
#yahns_step ⇒ Object
the main entry point of the epoll/kqueue worker loop
118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 |
# File 'lib/yahns/http_client.rb', line 118 def yahns_step # always write unwritten data first if we have any return step_write if Yahns::WbufCommon === @state # only read if we had nothing to write in this event loop iteration k = self.class rbuf = Thread.current[:yahns_rbuf] # running under spawn_worker_threads case @state when :pipelined if @hs.parse case input = input_ready when :wait_readable, :wait_writable, :close then return input when false # keep looping on @state else return app_call(input) end # @state == :body if we get here point (input_ready -> mkinput_preread) else @state = :headers end # continue to outer loop when :headers case rv = kgio_tryread(k.client_header_buffer_size, rbuf) when String if @hs.add_parse(rv) case input = input_ready when :wait_readable, :wait_writable, :close then return input when false then break # to outer loop to reevaluate @state == :body else return app_call(input) end end # keep looping on kgio_tryread when :wait_readable, :wait_writable, nil return rv end while true when :body if @hs.body_eof? if @hs.content_length || @hs.parse # hp.parse == trailers done! @input.rewind return app_call(@input) else # possible Transfer-Encoding:chunked, keep looping @state = :trailers end else rv = fill_body(k.client_body_buffer_size, rbuf) return rv unless true == rv end when :trailers rv = read_trailers(k.client_header_buffer_size, rbuf) return true == rv ? app_call(@input) : rv when :ccc_done # unlikely return app_call(nil) when :r100_done # unlikely rv = r100_done return rv unless rv == true raise "BUG: body=#@state " if @state != :body # @state == :body, keep looping end while true # outer loop rescue => e handle_error(e) end |