Class: Yahns::HttpClient

Inherits:
Kgio::Socket
  • Object
show all
Includes:
HttpResponse
Defined in:
lib/yahns/http_client.rb

Overview

Copyright © 2013, Eric Wong <[email protected]> and all contributors License: GPLv3 or later (www.gnu.org/licenses/gpl-3.0.txt)

Constant Summary collapse

NULL_IO =

:nodoc:

StringIO.new("")
QEV_FLAGS =

used by acceptor

Yahns::Queue::QEV_RD
REMOTE_ADDR =

A frozen format for this is about 15% faster (note from Mongrel)

'REMOTE_ADDR'.freeze
RACK_INPUT =
'rack.input'.freeze
RACK_HIJACK =
'rack.hijack'.freeze
RACK_HIJACK_IO =
"rack.hijack_io".freeze

Constants included from HttpResponse

Yahns::HttpResponse::CCC_RESPONSE_START, Yahns::HttpResponse::CONN_CLOSE, Yahns::HttpResponse::CONN_KA, Yahns::HttpResponse::HEAD, Yahns::HttpResponse::MSG_DONTWAIT, Yahns::HttpResponse::MSG_MORE, Yahns::HttpResponse::MTX, Yahns::HttpResponse::REQUEST_METHOD, Yahns::HttpResponse::RESPONSE_START, Yahns::HttpResponse::Z

Instance Method Summary collapse

Methods included from HttpResponse

#chunk_out, #do_ccc, #err_response, #have_more?, #http_100_response, #http_response_done, #http_response_write, #httpdate, #kgio_syssend, #kv_str, #proxy_busy_mod_blocked, #proxy_busy_mod_done, #proxy_err_response, #proxy_response_finish, #proxy_response_start, #proxy_wait_next, #proxy_write, #response_header_blocked, #response_start, #response_wait_write, #trailer_out, #wait_on_upstream, #wbuf_maybe

Instance Method Details

#app_call(input) ⇒ Object



203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
# File 'lib/yahns/http_client.rb', line 203

def app_call(input)
  env = @hs.env
  k = self.class

  # input is nil if we needed to wait for writability with
  # check_client_connection
  if input
    env[REMOTE_ADDR] = @kgio_addr
    env[RACK_HIJACK] = self
    env[RACK_INPUT] = input

    if k.check_client_connection && @hs.headers?
      rv = do_ccc and return rv
    end
  end

  # run the rack app
  status, headers, body = k.app.call(env.merge!(k.app_defaults))
  return :ignore if env.include?(RACK_HIJACK_IO)
  if status.to_i == 100
    rv = http_100_response(env) and return rv
    status, headers, body = k.app.call(env)
  end

  # this returns :wait_readable, :wait_writable, :ignore, or nil:
  http_response_write(status, headers, body)
end

#callObject

this is the env callback exposed to the Rack app



264
265
266
267
# File 'lib/yahns/http_client.rb', line 264

def call
  hijack_cleanup
  @hs.env[RACK_HIJACK_IO] = self
end

#fill_body(rsize, rbuf) ⇒ Object

returns true if we want to keep looping on this returns :wait_readable/wait_writable/nil to yield back to epoll



87
88
89
90
91
92
93
94
95
96
97
98
# File 'lib/yahns/http_client.rb', line 87

def fill_body(rsize, rbuf)
  case rv = kgio_tryread(rsize, rbuf)
  when String
    @hs.filter_body(rbuf, @hs.buf << rbuf)
    @input.write(rbuf)
    true # keep looping on kgio_tryread (but check body_eof? first)
  when :wait_readable, :wait_writable
    rv # have epoll/kqueue wait for more
  when nil # unexpected EOF
    @input.close # nil
  end
end

#handle_error(e) ⇒ Object

if we get any error, try to write something back to the client assuming we haven’t closed the socket, but don’t get hung up if the socket is already closed or broken. We’ll always return nil to ensure the socket is closed at the end of this function



279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
# File 'lib/yahns/http_client.rb', line 279

def handle_error(e)
  code = case e
  when EOFError,Errno::ECONNRESET,Errno::EPIPE,Errno::ENOTCONN,
       Errno::ETIMEDOUT,Errno::EHOSTUNREACH
    return # don't send response, drop the connection
  when Yahns::ClientTimeout
    408
  when Unicorn::RequestURITooLongError
    414
  when Unicorn::RequestEntityTooLargeError
    413
  when Unicorn::HttpParserError # try to tell the client they're bad
    400
  else
    Yahns::Log.exception(@hs.env["rack.logger"], "app error", e)
    500
  end
  kgio_trywrite(err_response(code))
rescue
ensure
  shutdown rescue nil
  return # always drop the connection on uncaught errors
end

#hijack_cleanupObject

allow releasing some memory if rack.hijack is used n.b. we no longer issue EPOLL_CTL_DEL because it becomes more expensive (and complicated) as our hijack support will allow “un-hijacking” the socket.



257
258
259
260
261
# File 'lib/yahns/http_client.rb', line 257

def hijack_cleanup
  # prevent socket from holding process up
  Thread.current[:yahns_fdmap].forget(self)
  @input = nil # keep env["rack.input"] accessible, though
end

#input_readyObject



67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# File 'lib/yahns/http_client.rb', line 67

def input_ready
  empty_body = 0 == @hs.content_length
  k = self.class
  case k.input_buffering
  when true
    rv = http_100_response(@hs.env) and return rv

    # common case is an empty body
    return NULL_IO if empty_body

    # content_length is nil (chunked) or len > 0
    mkinput_preread # keep looping
    false
  else # :lazy, false
    empty_body ? NULL_IO : (@input = k.mkinput(self, @hs))
  end
end

#kgio_wait_readable(timeout = self.class.client_timeout) ⇒ Object

called automatically by kgio_read



237
238
239
# File 'lib/yahns/http_client.rb', line 237

def kgio_wait_readable(timeout = self.class.client_timeout)
  super timeout
end

#kgio_wait_writable(timeout = self.class.client_timeout) ⇒ Object

called automatically by kgio_write



232
233
234
# File 'lib/yahns/http_client.rb', line 232

def kgio_wait_writable(timeout = self.class.client_timeout)
  super timeout
end

#mkinput_prereadObject

used only with “input_buffering true”



51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/yahns/http_client.rb', line 51

def mkinput_preread
  k = self.class
  len = @hs.content_length
  mbs = k.client_max_body_size
  if mbs && len && len > mbs
    raise Unicorn::RequestEntityTooLargeError,
          "Content-Length:#{len} too large (>#{mbs})", []
  end
  @state = :body
  @input = k.tmpio_for(len, @hs.env)

  rbuf = Thread.current[:yahns_rbuf]
  @hs.filter_body(rbuf, @hs.buf)
  @input.write(rbuf)
end

#r100_doneObject

only called when buffering slow clients returns :wait_readable, :wait_writable, :ignore, or nil for epoll returns true to keep looping inside yahns_step



185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
# File 'lib/yahns/http_client.rb', line 185

def r100_done
  k = self.class
  case k.input_buffering
  when true
    empty_body = 0 == @hs.content_length
    # common case is an empty body
    return app_call(NULL_IO) if empty_body

    # content_length is nil (chunked) or len > 0
    mkinput_preread # keep looping (@state == :body)
    true
  else # :lazy, false
    r = k.app.call(env = @hs.env)
    return :ignore if env.include?(RACK_HIJACK_IO)
    http_response_write(*r)
  end
end

#read_trailers(rsize, rbuf) ⇒ Object

returns true if we are ready to dispatch the app returns :wait_readable/wait_writable/nil to yield back to epoll



102
103
104
105
106
107
108
109
110
111
112
113
114
115
# File 'lib/yahns/http_client.rb', line 102

def read_trailers(rsize, rbuf)
  case rv = kgio_tryread(rsize, rbuf)
  when String
    if @hs.add_parse(rbuf)
      @input.rewind
      return true
    end
    # keep looping on kgio_tryread...
  when :wait_readable, :wait_writable
    return rv # wait for more
  when nil # unexpected EOF
    return @input.close # nil
  end while true
end

#response_hijacked(fn) ⇒ Object



269
270
271
272
273
# File 'lib/yahns/http_client.rb', line 269

def response_hijacked(fn)
  hijack_cleanup
  fn.call(self)
  :ignore
end

#step_writeObject

use if writes are deferred by buffering, this return value goes to the main epoll/kqueue worker loop returns :wait_readable, :wait_writable, or nil



31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/yahns/http_client.rb', line 31

def step_write
  case rv = @state.wbuf_flush(self)
  when :wait_writable, :wait_readable
    return rv # tell epoll/kqueue to wait on this more
  when :ignore # :ignore on hijack
    @state = :ignore
    return :ignore
  when Yahns::StreamFile
    @state = rv # continue looping
  when true, false # done
    return http_response_done(rv)
  when :ccc_done, :r100_done
    @state = rv
    return :wait_writable
  else
    raise "BUG: #{@state.inspect}#wbuf_flush returned #{rv.inspect}"
  end while true
end

#yahns_initObject

called from acceptor thread



21
22
23
24
25
26
# File 'lib/yahns/http_client.rb', line 21

def yahns_init
  @hs = Unicorn::HttpRequest.new
  @response_start_sent = false
  @state = :headers # :body, :trailers, :pipelined, Wbuf, StreamFile
  @input = nil
end

#yahns_read(bytes, buf) ⇒ Object

used by StreamInput (and thus TeeInput) for input_buffering false|:lazy



242
243
244
245
246
247
248
249
250
251
# File 'lib/yahns/http_client.rb', line 242

def yahns_read(bytes, buf)
  case rv = kgio_tryread(bytes, buf)
  when String, nil
    return rv
  when :wait_readable
    kgio_wait_readable or raise Yahns::ClientTimeout, "waiting for read", []
  when :wait_writable
    kgio_wait_writable or raise Yahns::ClientTimeout, "waiting for write", []
  end while true
end

#yahns_stepObject

the main entry point of the epoll/kqueue worker loop



118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
# File 'lib/yahns/http_client.rb', line 118

def yahns_step
  # always write unwritten data first if we have any
  return step_write if Yahns::WbufCommon === @state

  # only read if we had nothing to write in this event loop iteration
  k = self.class
  rbuf = Thread.current[:yahns_rbuf] # running under spawn_worker_threads

  case @state
  when :pipelined
    if @hs.parse
      case input = input_ready
      when :wait_readable, :wait_writable, :close then return input
      when false # keep looping on @state
      else
        return app_call(input)
      end
      # @state == :body if we get here point (input_ready -> mkinput_preread)
    else
      @state = :headers
    end
    # continue to outer loop
  when :headers
    case rv = kgio_tryread(k.client_header_buffer_size, rbuf)
    when String
      if @hs.add_parse(rv)
        case input = input_ready
        when :wait_readable, :wait_writable, :close then return input
        when false then break # to outer loop to reevaluate @state == :body
        else
          return app_call(input)
        end
      end
      # keep looping on kgio_tryread
    when :wait_readable, :wait_writable, nil
      return rv
    end while true
  when :body
    if @hs.body_eof?
      if @hs.content_length || @hs.parse # hp.parse == trailers done!
        @input.rewind
        return app_call(@input)
      else # possible Transfer-Encoding:chunked, keep looping
        @state = :trailers
      end
    else
      rv = fill_body(k.client_body_buffer_size, rbuf)
      return rv unless true == rv
    end
  when :trailers
    rv = read_trailers(k.client_header_buffer_size, rbuf)
    return true == rv ? app_call(@input) : rv
  when :ccc_done # unlikely
    return app_call(nil)
  when :r100_done # unlikely
    rv = r100_done
    return rv unless rv == true
    raise "BUG: body=#@state " if @state != :body
    # @state == :body, keep looping
  end while true # outer loop
rescue => e
  handle_error(e)
end