Class: Yahns::HttpClient

Inherits:
Kgio::Socket
  • Object
show all
Includes:
HttpResponse
Defined in:
lib/yahns/http_client.rb

Overview

:nodoc:

Constant Summary collapse

NULL_IO =

:nodoc:

StringIO.new(''.dup)
QEV_FLAGS =

used by acceptor

Yahns::Queue::QEV_RD

Constants included from HttpResponse

Yahns::HttpResponse::CCC_RESPONSE_START, Yahns::HttpResponse::MSG_DONTWAIT, Yahns::HttpResponse::MSG_MORE, Yahns::HttpResponse::MTX

Instance Method Summary collapse

Methods included from HttpResponse

#chunk_out, #do_ccc, #err_response, #http_100_response, #http_response_done, #http_response_prep, #http_response_write, #httpdate, #kgio_syssend, #kv_str, #proxy_busy_mod, #proxy_err_response, #proxy_read_body, #proxy_read_trailers, #proxy_res_headers, #proxy_response_finish, #proxy_response_start, #proxy_unbuffer, #proxy_wait_next, #proxy_write, #response_header_blocked, #response_start, #response_wait_write, #trailer_out, #wait_on_upstream, #wbuf_alloc, #wbuf_maybe

Instance Method Details

#app_call(input) ⇒ Object



200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
# File 'lib/yahns/http_client.rb', line 200

def app_call(input)
  env = @hs.env
  k = self.class

  # input is nil if we needed to wait for writability with
  # check_client_connection
  if input
    env['REMOTE_ADDR'] = @kgio_addr
    env['rack.hijack'] = self
    env['rack.input'] = input

    if k.check_client_connection && @hs.headers?
      rv = do_ccc and return rv
    end
  end

  env.merge!(k.app_defaults)

  # workaround stupid unicorn_http parser behavior when it parses HTTP_HOST
  if env['HTTPS'] == 'on'.freeze &&
      env['HTTP_HOST'] &&
      env['SERVER_PORT'] == '80'.freeze
    env['SERVER_PORT'] = '443'.freeze
  end

  opt = http_response_prep(env)
  # run the rack app
  res = k.app.call(env)
  return :ignore if app_hijacked?(env, res)
  if res[0].to_i == 100
    rv = http_100_response(env) and return rv
    res = k.app.call(env)
  end

  # this returns :wait_readable, :wait_writable, :ignore, or nil:
  http_response_write(res, opt)
end

#app_hijacked?(env, res) ⇒ Boolean

Returns:

  • (Boolean)


319
320
321
322
323
# File 'lib/yahns/http_client.rb', line 319

def app_hijacked?(env, res)
  return false unless env.include?('rack.hijack_io'.freeze)
  res[2].close if res && res[2].respond_to?(:close)
  true
end

#callObject

this is the env callback exposed to the Rack app



272
273
274
275
# File 'lib/yahns/http_client.rb', line 272

def call
  hijack_cleanup
  @hs.env['rack.hijack_io'] = self
end

#do_pread(io, count, offset) ⇒ Object



325
326
327
328
329
330
331
332
333
334
335
336
337
# File 'lib/yahns/http_client.rb', line 325

def do_pread(io, count, offset)
  count = 0x4000 if count > 0x4000
  buf = Thread.current[:yahns_sfbuf] ||= ''.dup
  if io.respond_to?(:pread)
    io.pread(count, offset, buf)
  else
    io.pos = offset
    io.read(count, buf)
  end
rescue EOFError
  warn "BUG: do_pread overreach:\n #{caller.join("\n ")}\n"
  nil
end

#fill_body(rsize, rbuf) ⇒ Object

returns true if we want to keep looping on this returns :wait_readable/wait_writable/nil to yield back to epoll



82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/yahns/http_client.rb', line 82

def fill_body(rsize, rbuf)
  case rv = kgio_tryread(rsize, rbuf)
  when String
    @hs.filter_body(rbuf, @hs.buf << rbuf)
    @input.write(rbuf)
    true # keep looping on kgio_tryread (but check body_eof? first)
  when :wait_readable, :wait_writable
    rv # have epoll/kqueue wait for more
  when nil # unexpected EOF
    @input.close # nil
  end
end

#handle_error(e) ⇒ Object

if we get any error, try to write something back to the client assuming we haven’t closed the socket, but don’t get hung up if the socket is already closed or broken. We’ll always return nil to ensure the socket is closed at the end of this function



287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
# File 'lib/yahns/http_client.rb', line 287

def handle_error(e)
  code = case e
  when EOFError,Errno::ECONNRESET,Errno::EPIPE,Errno::ENOTCONN,
       Errno::ETIMEDOUT,Errno::EHOSTUNREACH
    return # don't send response, drop the connection
  when Yahns::ClientTimeout
    408
  when Unicorn::RequestURITooLongError
    414
  when Unicorn::RequestEntityTooLargeError
    413
  when Unicorn::HttpParserError # try to tell the client they're bad
    400
  else
    n = 500
    case e.class.to_s
    when 'OpenSSL::SSL::SSLError'
      if e.message.include?('wrong version number')
        n = nil
        e.set_backtrace([])
      end
    end
    Yahns::Log.exception(@hs.env["rack.logger"], "app error", e)
    n
  end
  kgio_trywrite(err_response(code)) if code
rescue
ensure
  shutdown rescue nil
  return # always drop the connection on uncaught errors
end

#hijack_cleanupObject

allow releasing some memory if rack.hijack is used n.b. we no longer issue EPOLL_CTL_DEL because it becomes more expensive (and complicated) as our hijack support will allow “un-hijacking” the socket.



264
265
266
267
268
269
# File 'lib/yahns/http_client.rb', line 264

def hijack_cleanup
  # prevent socket from holding process exit up
  Thread.current[:yahns_fdmap].forget(self)
  @state = :ignore
  @input = nil # keep env["rack.input"] accessible, though
end

#input_readyObject



62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/yahns/http_client.rb', line 62

def input_ready
  empty_body = 0 == @hs.content_length
  k = self.class
  case k.input_buffering
  when true
    rv = http_100_response(@hs.env) and return rv

    # common case is an empty body
    return NULL_IO if empty_body

    # content_length is nil (chunked) or len > 0
    mkinput_preread # keep looping
    false
  else # :lazy, false
    empty_body ? NULL_IO : (@input = k.mkinput(self, @hs))
  end
end

#kgio_wait_readable(timeout = self.class.client_timeout) ⇒ Object

called automatically by kgio_read



244
245
246
# File 'lib/yahns/http_client.rb', line 244

def kgio_wait_readable(timeout = self.class.client_timeout)
  super timeout
end

#kgio_wait_writable(timeout = self.class.client_timeout) ⇒ Object

called automatically by kgio_write



239
240
241
# File 'lib/yahns/http_client.rb', line 239

def kgio_wait_writable(timeout = self.class.client_timeout)
  super timeout
end

#mkinput_prereadObject

used only with “input_buffering true”



46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/yahns/http_client.rb', line 46

def mkinput_preread
  k = self.class
  len = @hs.content_length
  mbs = k.client_max_body_size
  if mbs && len && len > mbs
    raise Unicorn::RequestEntityTooLargeError,
          "Content-Length:#{len} too large (>#{mbs})", []
  end
  @state = :body
  @input = k.tmpio_for(len, @hs.env)

  rbuf = Thread.current[:yahns_rbuf]
  @hs.filter_body(rbuf, @hs.buf)
  @input.write(rbuf)
end

#r100_doneObject

only called when buffering slow clients returns :wait_readable, :wait_writable, :ignore, or nil for epoll returns true to keep looping inside yahns_step



180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
# File 'lib/yahns/http_client.rb', line 180

def r100_done
  k = self.class
  case k.input_buffering
  when true
    empty_body = 0 == @hs.content_length
    # common case is an empty body
    return app_call(NULL_IO) if empty_body

    # content_length is nil (chunked) or len > 0
    mkinput_preread # keep looping (@state == :body)
    true
  else # :lazy, false
    env = @hs.env
    opt = http_response_prep(env)
    res = k.app.call(env)
    return :ignore if app_hijacked?(env, res)
    http_response_write(res, opt)
  end
end

#read_trailers(rsize, rbuf) ⇒ Object

returns true if we are ready to dispatch the app returns :wait_readable/wait_writable/nil to yield back to epoll



97
98
99
100
101
102
103
104
105
106
107
108
109
110
# File 'lib/yahns/http_client.rb', line 97

def read_trailers(rsize, rbuf)
  case rv = kgio_tryread(rsize, rbuf)
  when String
    if @hs.add_parse(rbuf)
      @input.rewind
      return true
    end
    # keep looping on kgio_tryread...
  when :wait_readable, :wait_writable
    return rv # wait for more
  when nil # unexpected EOF
    return @input.close # nil
  end while true
end

#response_hijacked(fn) ⇒ Object



277
278
279
280
281
# File 'lib/yahns/http_client.rb', line 277

def response_hijacked(fn)
  hijack_cleanup
  fn.call(self)
  :ignore
end

#step_writeObject

use if writes are deferred by buffering, this return value goes to the main epoll/kqueue worker loop returns :wait_readable, :wait_writable, or nil



27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/yahns/http_client.rb', line 27

def step_write
  case rv = @state.wbuf_flush(self)
  when :wait_writable, :wait_readable
    return rv # tell epoll/kqueue to wait on this more
  when :ignore # :ignore on hijack, @state already set in hijack_cleanup
    return :ignore
  when Yahns::StreamFile
    @state = rv # continue looping
  when true, false # done
    return http_response_done(rv)
  when :ccc_done, :r100_done
    @state = rv
    return :wait_writable
  else
    raise "BUG: #{@state.inspect}#wbuf_flush returned #{rv.inspect}"
  end while true
end

#trysendio(io, offset, count) ⇒ Object Also known as: trysendfile



339
340
341
342
343
344
345
346
347
348
349
350
351
352
# File 'lib/yahns/http_client.rb', line 339

def trysendio(io, offset, count)
  return 0 if count == 0
  str = do_pread(io, count, offset) or return # nil for EOF
  n = 0
  case rv = kgio_trywrite(str)
  when String # partial write, keep trying
    n += (str.size - rv.size)
    str = rv
  when :wait_writable, :wait_readable
    return n > 0 ? n : rv
  when nil
    return n + str.size # yay!
  end while true
end

#yahns_initObject

called from acceptor thread



18
19
20
21
22
# File 'lib/yahns/http_client.rb', line 18

def yahns_init
  @hs = Unicorn::HttpRequest.new
  @state = :headers # :body, :trailers, :pipelined, Wbuf, StreamFile
  @input = nil
end

#yahns_read(bytes, buf) ⇒ Object

used by StreamInput (and thus TeeInput) for input_buffering false|:lazy



249
250
251
252
253
254
255
256
257
258
# File 'lib/yahns/http_client.rb', line 249

def yahns_read(bytes, buf)
  case rv = kgio_tryread(bytes, buf)
  when String, nil
    return rv
  when :wait_readable
    kgio_wait_readable or raise Yahns::ClientTimeout, "waiting for read", []
  when :wait_writable
    kgio_wait_writable or raise Yahns::ClientTimeout, "waiting for write", []
  end while true
end

#yahns_stepObject

the main entry point of the epoll/kqueue worker loop



113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
# File 'lib/yahns/http_client.rb', line 113

def yahns_step
  # always write unwritten data first if we have any
  return step_write if Yahns::WbufCommon === @state

  # only read if we had nothing to write in this event loop iteration
  k = self.class
  rbuf = Thread.current[:yahns_rbuf] # running under spawn_worker_threads

  case @state
  when :pipelined
    if @hs.parse
      case input = input_ready
      when :wait_readable, :wait_writable, :close then return input
      when false # keep looping on @state
      else
        return app_call(input)
      end
      # @state == :body if we get here point (input_ready -> mkinput_preread)
    else
      @state = :headers
    end
    # continue to outer loop
  when :headers
    case rv = kgio_tryread(k.client_header_buffer_size, rbuf)
    when String
      if @hs.add_parse(rv)
        case input = input_ready
        when :wait_readable, :wait_writable, :close then return input
        when false then break # to outer loop to reevaluate @state == :body
        else
          return app_call(input)
        end
      end
      # keep looping on kgio_tryread
    when :wait_readable, :wait_writable, nil
      return rv
    end while true
  when :body
    if @hs.body_eof?
      if @hs.content_length || @hs.parse # hp.parse == trailers done!
        @input.rewind
        return app_call(@input)
      else # possible Transfer-Encoding:chunked, keep looping
        @state = :trailers
      end
    else
      rv = fill_body(k.client_body_buffer_size, rbuf)
      return rv unless true == rv
    end
  when :trailers
    rv = read_trailers(k.client_header_buffer_size, rbuf)
    return true == rv ? app_call(@input) : rv
  when :ccc_done # unlikely
    return app_call(nil)
  when :r100_done # unlikely
    rv = r100_done
    return rv unless rv == true
    raise "BUG: body=#@state " if @state != :body
    # @state == :body, keep looping
  end while true # outer loop
rescue => e
  handle_error(e)
end