Class: Yahns::HttpClient
- Inherits:
-
Kgio::Socket
- Object
- Kgio::Socket
- Yahns::HttpClient
- Includes:
- HttpResponse
- Defined in:
- lib/yahns/http_client.rb
Overview
Copyright © 2013, Eric Wong <[email protected]> and all contributors License: GPLv3 or later (www.gnu.org/licenses/gpl-3.0.txt)
Constant Summary collapse
- NULL_IO =
:nodoc:
StringIO.new("")
- QEV_FLAGS =
used by acceptor
Yahns::Queue::QEV_RD
- REMOTE_ADDR =
A frozen format for this is about 15% faster (note from Mongrel)
'REMOTE_ADDR'.freeze
- RACK_INPUT =
'rack.input'.freeze
- RACK_HIJACK =
'rack.hijack'.freeze
- RACK_HIJACK_IO =
"rack.hijack_io".freeze
Constants included from HttpResponse
Yahns::HttpResponse::CCC_RESPONSE_START, Yahns::HttpResponse::CONN_CLOSE, Yahns::HttpResponse::CONN_KA, Yahns::HttpResponse::HEAD, Yahns::HttpResponse::HTTP_EXPECT, Yahns::HttpResponse::MSG_DONTWAIT, Yahns::HttpResponse::MSG_MORE, Yahns::HttpResponse::MTX, Yahns::HttpResponse::R100_CCC, Yahns::HttpResponse::R100_RAW, Yahns::HttpResponse::REQUEST_METHOD, Yahns::HttpResponse::RESPONSE_START, Yahns::HttpResponse::Z
Instance Method Summary collapse
- #app_call(input) ⇒ Object
-
#fill_body(rsize, rbuf) ⇒ Object
returns true if we want to keep looping on this returns :wait_readable/wait_writable/nil to yield back to epoll.
-
#handle_error(e) ⇒ Object
if we get any error, try to write something back to the client assuming we haven’t closed the socket, but don’t get hung up if the socket is already closed or broken.
- #hijack_proc(env) ⇒ Object
- #input_ready ⇒ Object
-
#kgio_wait_readable(timeout = self.class.client_timeout) ⇒ Object
called automatically by kgio_read.
-
#kgio_wait_writable(timeout = self.class.client_timeout) ⇒ Object
called automatically by kgio_write.
-
#mkinput_preread ⇒ Object
used only with “input_buffering true”.
-
#r100_done ⇒ Object
returns :wait_readable, :wait_writable, :ignore, or nil for epoll returns false to keep looping inside yahns_step.
-
#read_trailers(rsize, rbuf) ⇒ Object
returns true if we are ready to dispatch the app returns :wait_readable/wait_writable/nil to yield back to epoll.
- #response_hijacked(fn) ⇒ Object
-
#step_write ⇒ Object
use if writes are deferred by buffering, this return value goes to the main epoll/kqueue worker loop returns :wait_readable, :wait_writable, or nil.
-
#yahns_init ⇒ Object
called from acceptor thread.
-
#yahns_read(bytes, buf) ⇒ Object
used by StreamInput (and thus TeeInput) for input_buffering false|:lazy.
-
#yahns_step ⇒ Object
the main entry point of the epoll/kqueue worker loop.
Methods included from HttpResponse
#do_ccc, #err_response, #have_more?, #http_100_response, #http_response_done, #http_response_write, #httpdate, #kgio_syssend, #kv_str, #response_header_blocked, #response_start, #response_wait_write, #wbuf_maybe
Instance Method Details
#app_call(input) ⇒ Object
199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 |
# File 'lib/yahns/http_client.rb', line 199 def app_call(input) env = @hs.env k = self.class # input is nil if we needed to wait for writability with # check_client_connection if input env[REMOTE_ADDR] = @kgio_addr env[RACK_HIJACK] = hijack_proc(env) env[RACK_INPUT] = input if k.check_client_connection && @hs.headers? rv = do_ccc and return rv end end # run the rack app status, headers, body = k.app.call(env.merge!(k.app_defaults)) return :ignore if env.include?(RACK_HIJACK_IO) if status.to_i == 100 rv = http_100_response(env) and return rv status, headers, body = k.app.call(env) end # this returns :wait_readable, :wait_writable, :ignore, or nil: http_response_write(status, headers, body) end |
#fill_body(rsize, rbuf) ⇒ Object
returns true if we want to keep looping on this returns :wait_readable/wait_writable/nil to yield back to epoll
86 87 88 89 90 91 92 93 94 95 96 97 |
# File 'lib/yahns/http_client.rb', line 86 def fill_body(rsize, rbuf) case rv = kgio_tryread(rsize, rbuf) when String @hs.filter_body(rbuf, @hs.buf << rbuf) @input.write(rbuf) true # keep looping on kgio_tryread (but check body_eof? first) when :wait_readable, :wait_writable rv # have epoll/kqueue wait for more when nil # unexpected EOF @input.close # nil end end |
#handle_error(e) ⇒ Object
if we get any error, try to write something back to the client assuming we haven’t closed the socket, but don’t get hung up if the socket is already closed or broken. We’ll always return nil to ensure the socket is closed at the end of this function
270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 |
# File 'lib/yahns/http_client.rb', line 270 def handle_error(e) code = case e when EOFError,Errno::ECONNRESET,Errno::EPIPE,Errno::ENOTCONN, Errno::ETIMEDOUT,Errno::EHOSTUNREACH return # don't send response, drop the connection when Yahns::ClientTimeout 408 when Unicorn::RequestURITooLongError 414 when Unicorn::RequestEntityTooLargeError 413 when Unicorn::HttpParserError # try to tell the client they're bad 400 else Yahns::Log.exception(@hs.env["rack.logger"], "app error", e) 500 end kgio_trywrite(err_response(code)) rescue ensure shutdown rescue nil return # always drop the connection on uncaught errors end |
#hijack_proc(env) ⇒ Object
227 228 229 230 231 232 |
# File 'lib/yahns/http_client.rb', line 227 def hijack_proc(env) proc do self.class.queue.queue_del(self) # EPOLL_CTL_DEL env[RACK_HIJACK_IO] = self end end |
#input_ready ⇒ Object
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 |
# File 'lib/yahns/http_client.rb', line 66 def input_ready empty_body = 0 == @hs.content_length k = self.class case k.input_buffering when true rv = http_100_response(@hs.env) and return rv # common case is an empty body return NULL_IO if empty_body # content_length is nil (chunked) or len > 0 mkinput_preread # keep looping false else # :lazy, false empty_body ? NULL_IO : (@input = k.mkinput(self, @hs)) end end |
#kgio_wait_readable(timeout = self.class.client_timeout) ⇒ Object
called automatically by kgio_read
240 241 242 |
# File 'lib/yahns/http_client.rb', line 240 def kgio_wait_readable(timeout = self.class.client_timeout) super timeout end |
#kgio_wait_writable(timeout = self.class.client_timeout) ⇒ Object
called automatically by kgio_write
235 236 237 |
# File 'lib/yahns/http_client.rb', line 235 def kgio_wait_writable(timeout = self.class.client_timeout) super timeout end |
#mkinput_preread ⇒ Object
used only with “input_buffering true”
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/yahns/http_client.rb', line 50 def mkinput_preread k = self.class len = @hs.content_length mbs = k.client_max_body_size if mbs && len && len > mbs raise Unicorn::RequestEntityTooLargeError, "Content-Length:#{len} too large (>#{mbs})", [] end @state = :body @input = k.tmpio_for(len) rbuf = Thread.current[:yahns_rbuf] @hs.filter_body(rbuf, @hs.buf) @input.write(rbuf) end |
#r100_done ⇒ Object
returns :wait_readable, :wait_writable, :ignore, or nil for epoll returns false to keep looping inside yahns_step
183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 |
# File 'lib/yahns/http_client.rb', line 183 def r100_done k = self.class case k.input_buffering when true empty_body = 0 == @hs.content_length # common case is an empty body return app_call(NULL_IO) if empty_body # content_length is nil (chunked) or len > 0 mkinput_preread # keep looping (@state == :body) true else # :lazy, false http_response_write(*k.app.call(@hs.env)) end end |
#read_trailers(rsize, rbuf) ⇒ Object
returns true if we are ready to dispatch the app returns :wait_readable/wait_writable/nil to yield back to epoll
101 102 103 104 105 106 107 108 109 110 111 112 113 114 |
# File 'lib/yahns/http_client.rb', line 101 def read_trailers(rsize, rbuf) case rv = kgio_tryread(rsize, rbuf) when String if @hs.add_parse(rbuf) @input.rewind return true end # keep looping on kgio_tryread... when :wait_readable, :wait_writable return rv # wait for more when nil # unexpected EOF return @input.close # nil end while true end |
#response_hijacked(fn) ⇒ Object
256 257 258 259 260 261 262 263 264 |
# File 'lib/yahns/http_client.rb', line 256 def response_hijacked(fn) # we must issue EPOLL_CTL_DEL before hijacking (if we issue it at all), # because the hijacker may close use before we get back to the epoll worker # loop. EPOLL_CTL_DEL saves about 200 bytes of unswappable kernel memory, # so it can matter if we have lots of hijacked sockets. self.class.queue.queue_del(self) fn.call(self) :ignore end |
#step_write ⇒ Object
use if writes are deferred by buffering, this return value goes to the main epoll/kqueue worker loop returns :wait_readable, :wait_writable, or nil
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
# File 'lib/yahns/http_client.rb', line 30 def step_write case rv = @state.wbuf_flush(self) when :wait_writable, :wait_readable return rv # tell epoll/kqueue to wait on this more when :ignore # :ignore on hijack @state = :ignore return :ignore when Yahns::StreamFile @state = rv # continue looping when true, false # done return http_response_done(rv) when :ccc_done, :r100_done @state = rv return :wait_writable else raise "BUG: #{@state.inspect}#wbuf_flush returned #{rv.inspect}" end while true end |
#yahns_init ⇒ Object
called from acceptor thread
20 21 22 23 24 25 |
# File 'lib/yahns/http_client.rb', line 20 def yahns_init @hs = Unicorn::HttpRequest.new @response_start_sent = false @state = :headers # :body, :trailers, :pipelined, Wbuf, StreamFile @input = nil end |
#yahns_read(bytes, buf) ⇒ Object
used by StreamInput (and thus TeeInput) for input_buffering false|:lazy
245 246 247 248 249 250 251 252 253 254 |
# File 'lib/yahns/http_client.rb', line 245 def yahns_read(bytes, buf) case rv = kgio_tryread(bytes, buf) when String, nil return rv when :wait_readable kgio_wait_readable or raise Yahns::ClientTimeout, "waiting for read", [] when :wait_writable kgio_wait_writable or raise Yahns::ClientTimeout, "waiting for write", [] end while true end |
#yahns_step ⇒ Object
the main entry point of the epoll/kqueue worker loop
117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 |
# File 'lib/yahns/http_client.rb', line 117 def yahns_step # always write unwritten data first if we have any return step_write if Yahns::WbufCommon === @state # only read if we had nothing to write in this event loop iteration k = self.class rbuf = Thread.current[:yahns_rbuf] # running under spawn_worker_threads case @state when :pipelined if @hs.parse case input = input_ready when :wait_readable, :wait_writable, :close then return input when false # keep looping on @state else return app_call(input) end # @state == :body if we get here point (input_ready -> mkinput_preread) else @state = :headers end # continue to outer loop when :headers case rv = kgio_tryread(k.client_header_buffer_size, rbuf) when String if @hs.add_parse(rv) case input = input_ready when :wait_readable, :wait_writable, :close then return input when false then break # to outer loop to reevaluate @state == :body else return app_call(input) end end # keep looping on kgio_tryread when :wait_readable, :wait_writable, nil return rv end while true when :body if @hs.body_eof? if @hs.content_length || @hs.parse # hp.parse == trailers done! @input.rewind return app_call(@input) else # possible Transfer-Encoding:chunked, keep looping @state = :trailers end else rv = fill_body(k.client_body_buffer_size, rbuf) return rv unless true == rv end when :trailers rv = read_trailers(k.client_header_buffer_size, rbuf) return true == rv ? app_call(@input) : rv when :ccc_done # unlikely return app_call(nil) when :r100_done # unlikely rv = r100_done return rv unless rv == true raise "BUG: body=#@state " if @state != :body # @state == :body, keep looping end while true # outer loop rescue => e handle_error(e) end |