Class: Yahns::HttpClient

Inherits:
Kgio::Socket
  • Object
show all
Includes:
HttpResponse
Defined in:
lib/yahns/http_client.rb

Overview

Copyright © 2013, Eric Wong <[email protected]> and all contributors License: GPLv3 or later (www.gnu.org/licenses/gpl-3.0.txt)

Constant Summary collapse

NULL_IO =

:nodoc:

StringIO.new("")
QEV_FLAGS =

used by acceptor

Yahns::Queue::QEV_RD
REMOTE_ADDR =

A frozen format for this is about 15% faster (note from Mongrel)

'REMOTE_ADDR'.freeze
RACK_INPUT =
'rack.input'.freeze
RACK_HIJACK =
'rack.hijack'.freeze
RACK_HIJACK_IO =
"rack.hijack_io".freeze

Constants included from HttpResponse

Yahns::HttpResponse::CCC_RESPONSE_START, Yahns::HttpResponse::CONN_CLOSE, Yahns::HttpResponse::CONN_KA, Yahns::HttpResponse::HTTP_EXPECT, Yahns::HttpResponse::R100_CCC, Yahns::HttpResponse::R100_RAW, Yahns::HttpResponse::RESPONSE_START, Yahns::HttpResponse::Z

Instance Method Summary collapse

Methods included from HttpResponse

#do_ccc, #err_response, #http_100_response, #http_response_done, #http_response_write, #kv_str, #response_header_blocked, #response_start, #response_wait_write, #wbuf_maybe

Instance Method Details

#app_call(input) ⇒ Object



199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
# File 'lib/yahns/http_client.rb', line 199

def app_call(input)
  env = @hs.env
  k = self.class

  # input is nil if we needed to wait for writability with
  # check_client_connection
  if input
    env[REMOTE_ADDR] = @kgio_addr
    env[RACK_HIJACK] = hijack_proc(env)
    env[RACK_INPUT] = input

    if k.check_client_connection && @hs.headers?
      rv = do_ccc and return rv
    end
  end

  # run the rack app
  status, headers, body = k.app.call(env.merge!(k.app_defaults))
  return :ignore if env.include?(RACK_HIJACK_IO)
  if status.to_i == 100
    rv = http_100_response(env) and return rv
    status, headers, body = k.app.call(env)
  end

  # this returns :wait_readable, :wait_writable, :ignore, or nil:
  http_response_write(status, headers, body)
end

#fill_body(rsize, rbuf) ⇒ Object

returns true if we want to keep looping on this returns :wait_readable/wait_writable/nil to yield back to epoll



86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/yahns/http_client.rb', line 86

def fill_body(rsize, rbuf)
  case rv = kgio_tryread(rsize, rbuf)
  when String
    @hs.filter_body(rbuf, @hs.buf << rbuf)
    @input.write(rbuf)
    true # keep looping on kgio_tryread (but check body_eof? first)
  when :wait_readable, :wait_writable
    rv # have epoll/kqueue wait for more
  when nil # unexpected EOF
    @input.close # nil
  end
end

#handle_error(e) ⇒ Object

if we get any error, try to write something back to the client assuming we haven’t closed the socket, but don’t get hung up if the socket is already closed or broken. We’ll always return nil to ensure the socket is closed at the end of this function



270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
# File 'lib/yahns/http_client.rb', line 270

def handle_error(e)
  code = case e
  when EOFError,Errno::ECONNRESET,Errno::EPIPE,Errno::ENOTCONN,
       Errno::ETIMEDOUT
    return # don't send response, drop the connection
  when Yahns::ClientTimeout
    408
  when Unicorn::RequestURITooLongError
    414
  when Unicorn::RequestEntityTooLargeError
    413
  when Unicorn::HttpParserError # try to tell the client they're bad
    400
  else
    Yahns::Log.exception(@hs.env["rack.logger"], "app error", e)
    500
  end
  kgio_trywrite(err_response(code))
rescue
ensure
  shutdown rescue nil
  return # always drop the connection on uncaught errors
end

#hijack_proc(env) ⇒ Object



227
228
229
230
231
232
# File 'lib/yahns/http_client.rb', line 227

def hijack_proc(env)
  proc do
    self.class.queue.queue_del(self) # EPOLL_CTL_DEL
    env[RACK_HIJACK_IO] = self
  end
end

#input_readyObject



66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/yahns/http_client.rb', line 66

def input_ready
  empty_body = 0 == @hs.content_length
  k = self.class
  case k.input_buffering
  when true
    rv = http_100_response(@hs.env) and return rv

    # common case is an empty body
    return NULL_IO if empty_body

    # content_length is nil (chunked) or len > 0
    mkinput_preread # keep looping
    false
  else # :lazy, false
    empty_body ? NULL_IO : (@input = k.mkinput(self, @hs))
  end
end

#kgio_wait_readable(timeout = self.class.client_timeout) ⇒ Object

called automatically by kgio_read



240
241
242
# File 'lib/yahns/http_client.rb', line 240

def kgio_wait_readable(timeout = self.class.client_timeout)
  super timeout
end

#kgio_wait_writable(timeout = self.class.client_timeout) ⇒ Object

called automatically by kgio_write



235
236
237
# File 'lib/yahns/http_client.rb', line 235

def kgio_wait_writable(timeout = self.class.client_timeout)
  super timeout
end

#mkinput_prereadObject

used only with “input_buffering true”



50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/yahns/http_client.rb', line 50

def mkinput_preread
  k = self.class
  len = @hs.content_length
  mbs = k.client_max_body_size
  if mbs && len && len > mbs
    raise Unicorn::RequestEntityTooLargeError,
          "Content-Length:#{len} too large (>#{mbs})", []
  end
  @state = :body
  @input = k.tmpio_for(len)

  rbuf = Thread.current[:yahns_rbuf]
  @hs.filter_body(rbuf, @hs.buf)
  @input.write(rbuf)
end

#r100_doneObject

returns :wait_readable, :wait_writable, :ignore, or nil for epoll returns false to keep looping inside yahns_step



183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
# File 'lib/yahns/http_client.rb', line 183

def r100_done
  k = self.class
  case k.input_buffering
  when true
    empty_body = 0 == @hs.content_length
    # common case is an empty body
    return app_call(NULL_IO) if empty_body

    # content_length is nil (chunked) or len > 0
    mkinput_preread # keep looping (@state == :body)
    true
  else # :lazy, false
    http_response_write(*k.app.call(@hs.env))
  end
end

#read_trailers(rsize, rbuf) ⇒ Object

returns true if we are ready to dispatch the app returns :wait_readable/wait_writable/nil to yield back to epoll



101
102
103
104
105
106
107
108
109
110
111
112
113
114
# File 'lib/yahns/http_client.rb', line 101

def read_trailers(rsize, rbuf)
  case rv = kgio_tryread(rsize, rbuf)
  when String
    if @hs.add_parse(rbuf)
      @input.rewind
      return true
    end
    # keep looping on kgio_tryread...
  when :wait_readable, :wait_writable
    return rv # wait for more
  when nil # unexpected EOF
    return @input.close # nil
  end while true
end

#response_hijacked(fn) ⇒ Object



256
257
258
259
260
261
262
263
264
# File 'lib/yahns/http_client.rb', line 256

def response_hijacked(fn)
  # we must issue EPOLL_CTL_DEL before hijacking (if we issue it at all),
  # because the hijacker may close use before we get back to the epoll worker
  # loop.  EPOLL_CTL_DEL saves about 200 bytes of unswappable kernel memory,
  # so it can matter if we have lots of hijacked sockets.
  self.class.queue.queue_del(self)
  fn.call(self)
  :ignore
end

#step_writeObject

use if writes are deferred by buffering, this return value goes to the main epoll/kqueue worker loop returns :wait_readable, :wait_writable, or nil



30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/yahns/http_client.rb', line 30

def step_write
  case rv = @state.wbuf_flush(self)
  when :wait_writable, :wait_readable
    return rv # tell epoll/kqueue to wait on this more
  when :ignore # :ignore on hijack
    @state = :ignore
    return :ignore
  when Yahns::StreamFile
    @state = rv # continue looping
  when true, false # done
    return http_response_done(rv)
  when :ccc_done, :r100_done
    @state = rv
    return :wait_writable
  else
    raise "BUG: #{@state.inspect}#wbuf_flush returned #{rv.inspect}"
  end while true
end

#yahns_initObject

called from acceptor thread



20
21
22
23
24
25
# File 'lib/yahns/http_client.rb', line 20

def yahns_init
  @hs = Unicorn::HttpRequest.new
  @response_start_sent = false
  @state = :headers # :body, :trailers, :pipelined, Wbuf, StreamFile
  @input = nil
end

#yahns_read(bytes, buf) ⇒ Object

used by StreamInput (and thus TeeInput) for input_buffering false|:lazy



245
246
247
248
249
250
251
252
253
254
# File 'lib/yahns/http_client.rb', line 245

def yahns_read(bytes, buf)
  case rv = kgio_tryread(bytes, buf)
  when String, nil
    return rv
  when :wait_readable
    kgio_wait_readable or raise Yahns::ClientTimeout, "waiting for read", []
  when :wait_writable
    kgio_wait_writable or raise Yahns::ClientTimeout, "waiting for write", []
  end while true
end

#yahns_stepObject

the main entry point of the epoll/kqueue worker loop



117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
# File 'lib/yahns/http_client.rb', line 117

def yahns_step
  # always write unwritten data first if we have any
  return step_write if Yahns::WbufCommon === @state

  # only read if we had nothing to write in this event loop iteration
  k = self.class
  rbuf = Thread.current[:yahns_rbuf] # running under spawn_worker_threads

  case @state
  when :pipelined
    if @hs.parse
      case input = input_ready
      when :wait_readable, :wait_writable, :close then return input
      when false # keep looping on @state
      else
        return app_call(input)
      end
      # @state == :body if we get here point (input_ready -> mkinput_preread)
    else
      @state = :headers
    end
    # continue to outer loop
  when :headers
    case rv = kgio_tryread(k.client_header_buffer_size, rbuf)
    when String
      if @hs.add_parse(rv)
        case input = input_ready
        when :wait_readable, :wait_writable, :close then return input
        when false then break # to outer loop to reevaluate @state == :body
        else
          return app_call(input)
        end
      end
      # keep looping on kgio_tryread
    when :wait_readable, :wait_writable, nil
      return rv
    end while true
  when :body
    if @hs.body_eof?
      if @hs.content_length || @hs.parse # hp.parse == trailers done!
        @input.rewind
        return app_call(@input)
      else # possible Transfer-Encoding:chunked, keep looping
        @state = :trailers
      end
    else
      rv = fill_body(k.client_body_buffer_size, rbuf)
      return rv unless true == rv
    end
  when :trailers
    rv = read_trailers(k.client_header_buffer_size, rbuf)
    return true == rv ? app_call(@input) : rv
  when :ccc_done # unlikely
    return app_call(nil)
  when :r100_done # unlikely
    rv = r100_done
    return rv unless rv == true
    raise "BUG: body=#@state " if @state != :body
    # @state == :body, keep looping
  end while true # outer loop
rescue => e
  handle_error(e)
end