Module: Yahns::HttpResponse

Includes:
Unicorn::HttpResponse
Included in:
HttpClient
Defined in:
lib/yahns/http_response.rb,
lib/yahns/proxy_http_response.rb

Overview

loaded by yahns/proxy_pass, this relies on Yahns::HttpResponse for constants.

Constant Summary collapse

MTX =
Mutex.new
CCC_RESPONSE_START =

avoid GC overhead for frequently used-strings/objects:

[ 'HTTP', '/1.1 ' ]
MSG_MORE =
0
MSG_DONTWAIT =
0

Instance Method Summary collapse

Instance Method Details

#chunk_out(buf) ⇒ Object

n.b.: we can use String#size for optimized dispatch under YARV instead of String#bytesize because all the IO read methods return a binary string when given a maximum read length



296
297
298
# File 'lib/yahns/proxy_http_response.rb', line 296

def chunk_out(buf)
  [ "#{buf.size.to_s(16)}\r\n", buf, "\r\n".freeze ]
end

#do_cccObject

returns nil on success :wait_readable/:wait_writable/:close for epoll



241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
# File 'lib/yahns/http_response.rb', line 241

def do_ccc
  @hs.response_start_sent = true
  wbuf = nil
  rv = nil
  CCC_RESPONSE_START.each do |buf|
    if wbuf
      wbuf << buf
    else
      case rv = kgio_trywrite(buf)
      when nil
        break
      when String
        buf = rv
      when :wait_writable, :wait_readable
        if self.class.output_buffering
          wbuf = buf.dup
          @state = Yahns::WbufStr.new(wbuf, :ccc_done)
          break
        else
          response_wait_write(rv) or return :close
        end
      end while true
    end
  end
  rv
end

#err_response(code) ⇒ Object



58
59
60
# File 'lib/yahns/http_response.rb', line 58

def err_response(code)
  "#{response_start}#{code} #{Rack::Utils::HTTP_STATUS_CODES[code]}\r\n\r\n"
end

#http_100_response(env) ⇒ Object

only used if input_buffering is true (not :lazy or false) input_buffering==:lazy/false gives control to the app returns nil on success returns :close, :wait_writable, or :wait_readable



272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
# File 'lib/yahns/http_response.rb', line 272

def http_100_response(env)
  env.delete('HTTP_EXPECT'.freeze) =~ /\A100-continue\z/i or return
  buf = @hs.response_start_sent ? "100 Continue\r\n\r\nHTTP/1.1 ".freeze
                                : "HTTP/1.1 100 Continue\r\n\r\n".freeze

  case rv = kgio_trywrite(buf)
  when String
    buf = rv
  when :wait_writable, :wait_readable
    if self.class.output_buffering
      @state = Yahns::WbufStr.new(buf, :r100_done)
      return rv
    else
      response_wait_write(rv) or return :close
    end
  else
    return rv
  end while true
end

#http_response_done(alive) ⇒ Object



93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/yahns/http_response.rb', line 93

def http_response_done(alive)
  @input = @input.close if @input
  if alive
    # @hs.buf will have data if the client pipelined
    if @hs.buf.empty?
      @state = :headers
      :wait_readable
    else
      @state = :pipelined
      # we shouldn't start processing the application again until we know
      # the socket is writable for the response
      :wait_writable
    end
  else
    # shutdown is needed in case the app forked, we rescue here since
    # StreamInput may issue shutdown as well
    shutdown rescue nil
    :close
  end
end

#http_response_prep(env) ⇒ Object

must be called before app dispatch, since the app can do all sorts of nasty things to env



294
295
296
297
# File 'lib/yahns/http_response.rb', line 294

def http_response_prep(env)
  [ env['REQUEST_METHOD'] == 'HEAD'.freeze, # hdr_only
    env['HTTP_VERSION'] == 'HTTP/1.1'.freeze ] # chunk_ok
end

#http_response_write(res, opt) ⇒ Object

writes the rack_response to socket as an HTTP response returns :wait_readable, :wait_writable, :forget, or nil



125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
# File 'lib/yahns/http_response.rb', line 125

def http_response_write(res, opt)
  status, headers, body = res
  offset = 0
  count = hijack = clen = nil
  alive = @hs.next? && self.class.persistent_connections
  flags = MSG_DONTWAIT
  term = false
  hdr_only, chunk_ok = opt

  if @hs.headers?
    code = status.to_i
    hdr_only ||= Rack::Utils::STATUS_WITH_NO_ENTITY_BODY.include?(code)
    msg = Rack::Utils::HTTP_STATUS_CODES[code]
    buf = "#{response_start}#{msg ? %Q(#{code} #{msg}) : status}\r\n" \
          "Date: #{httpdate}\r\n".dup
    headers.each do |key, value|
      case key
      when %r{\ADate\z}i
        next
      when %r{\AContent-Range\z}i
        if %r{\Abytes (\d+)-(\d+)/\d+\z} =~ value
          offset = $1.to_i
          count = $2.to_i - offset + 1
        end
        kv_str(buf, key, value)
      when %r{\AConnection\z}i
        # allow Rack apps to tell us they want to drop the client
        alive = false if value =~ /\bclose\b/i
      when %r{\AContent-Length\z}i
        term = true
        clen = value.to_i
        flags |= MSG_MORE if clen > 0 && !hdr_only
        kv_str(buf, key, value)
      when %r{\ATransfer-Encoding\z}i
        term = true if value =~ /\bchunked\b/i
        kv_str(buf, key, value)
      when "rack.hijack"
        hijack = value
      else
        kv_str(buf, key, value)
      end
    end
    count ||= clen

    if !term && chunk_ok
      term = true
      body = Yahns::ChunkBody.new(body, opt)
      buf << "Transfer-Encoding: chunked\r\n".freeze
    end
    alive &&= term
    buf << (alive ? "Connection: keep-alive\r\n\r\n".freeze
                  : "Connection: close\r\n\r\n".freeze)
    case rv = kgio_syssend(buf, flags)
    when nil # all done, likely
      buf.clear
      buf = nil # recycle any memory we used ASAP
      break
    when String
      flags = MSG_DONTWAIT
      buf = rv # unlikely, hope the skb grows
    when :wait_writable, :wait_readable # unlikely
      if self.class.output_buffering
        alive = hijack ? hijack : alive
        rv = response_header_blocked(buf, body, alive, offset, count)
        body = nil # ensure we do not close body in ensure
        return rv
      else
        response_wait_write(rv) or return :close
      end
    end while true
  end

  return response_hijacked(hijack) if hijack
  return http_response_done(alive) if hdr_only

  if body.respond_to?(:to_path) && count
    @state = body = Yahns::StreamFile.new(body, alive, offset, count)
    return step_write
  end

  wbuf = rv = nil
  body.each do |x|
    if wbuf
      rv = wbuf.wbuf_write(self, x)
    else
      case rv = String === x ? kgio_trywrite(x) : kgio_trywritev(x)
      when nil # all done, likely and good!
        break
      when String, Array
        x = rv # hope the skb grows when we loop into the trywrite
      when :wait_writable, :wait_readable
        if self.class.output_buffering
          wbuf = Yahns::Wbuf.new(body, alive)
          rv = wbuf.wbuf_write(self, x)
          break
        else
          response_wait_write(rv) or return :close
        end
      end while true
    end
  end

  # if we buffered the write body, we must return :wait_writable
  # (or :wait_readable for SSL) and hit Yahns::HttpClient#step_write
  if wbuf
    body = nil # ensure we do not close the body in ensure
    wbuf_maybe(wbuf, rv)
  else
    http_response_done(alive)
  end
ensure
  body.respond_to?(:close) and body.close
end

#httpdateObject



23
24
25
# File 'lib/yahns/http_response.rb', line 23

def httpdate
  MTX.synchronize { super }
end

#kgio_syssend(buf, flags) ⇒ Object



40
41
42
# File 'lib/yahns/http_response.rb', line 40

def kgio_syssend(buf, flags)
  kgio_trywrite(buf)
end

#kv_str(buf, key, value) ⇒ Object



114
115
116
117
118
119
120
121
# File 'lib/yahns/http_response.rb', line 114

def kv_str(buf, key, value)
  if value.include?("\n".freeze)
    # avoiding blank, key-only cookies with /\n+/
    value.split(/\n+/).each { |v| buf << "#{key}: #{v}\r\n" }
  else
    buf << "#{key}: #{value}\r\n"
  end
end

#proxy_busy_mod(wbuf, req_res) ⇒ Object



273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
# File 'lib/yahns/proxy_http_response.rb', line 273

def proxy_busy_mod(wbuf, req_res)
  if wbuf
    # we are completely done reading and buffering the upstream response,
    # but have not completely written the response to the client,
    # yield control to the client socket:
    @state = wbuf
    proxy_wait_next(wbuf.busy == :wait_readable ? Yahns::Queue::QEV_RD :
                    Yahns::Queue::QEV_WR)
    # no touching self after proxy_wait_next, we may be running
    # HttpClient#yahns_step in a different thread at this point
  else
    case http_response_done(req_res.alive)
    when :wait_readable then proxy_wait_next(Yahns::Queue::QEV_RD)
    when :wait_writable then proxy_wait_next(Yahns::Queue::QEV_WR)
    when :close then close
    end
  end
  nil # signal close for ReqRes#yahns_step
end

#proxy_err_response(code, req_res, exc) ⇒ Object



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/yahns/proxy_http_response.rb', line 48

def proxy_err_response(code, req_res, exc)
  logger = self.class.logger # Yahns::HttpContext#logger
  case exc
  when nil
    logger.error('premature upstream EOF')
  when Kcar::ParserError
    logger.error("upstream response error: #{exc.message}")
  when String
    logger.error(exc)
  else
    Yahns::Log.exception(logger, 'upstream error', exc)
  end
  # try to write something, but don't care if we fail
  Integer === code and
    kgio_trywrite("HTTP/1.1 #{code} #{
                   Rack::Utils::HTTP_STATUS_CODES[code]}\r\n\r\n") rescue nil

  shutdown rescue nil
  @input = @input.close if @input

  # this is safe ONLY because we are in an :ignore state after
  # Fdmap#forget when we got hijacked:
  close

  nil # signal close of req_res from yahns_step in yahns/proxy_pass.rb
ensure
  wbuf = req_res.resbuf
  wbuf.wbuf_abort if wbuf.respond_to?(:wbuf_abort)
end

#proxy_read_body(tip, kcar, req_res) ⇒ Object



147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
# File 'lib/yahns/proxy_http_response.rb', line 147

def proxy_read_body(tip, kcar, req_res)
  chunk = ''.dup if kcar.chunked?
  len = kcar.body_bytes_left
  rbuf = Thread.current[:yahns_rbuf]
  alive = req_res.alive
  wbuf = req_res.resbuf

  case tmp = tip.shift || req_res.kgio_tryread(0x2000, rbuf)
  when String
    if len
      kcar.body_bytes_left -= tmp.size # progress for body_eof? => true
    elsif chunk
      kcar.filter_body(chunk, rbuf = tmp) # progress for body_eof? => true
      next if chunk.empty? # call req_res.kgio_tryread for more
      tmp = chunk_out(chunk)
    elsif alive # HTTP/1.0 upstream, HTTP/1.1 client
      tmp = chunk_out(tmp)
    # else # HTTP/1.0 upstream, HTTP/1.0 client, do nothing
    end
    wbuf = proxy_write(wbuf, tmp, req_res)
    chunk.clear if chunk
    if Yahns::WbufLite === wbuf
      req_res.proxy_trailers = [ rbuf.dup, tip ] if chunk && kcar.body_eof?
      return proxy_unbuffer(wbuf)
    end
  when nil # EOF
    # HTTP/1.1 upstream, unexpected premature EOF:
    msg = "upstream EOF (#{len} bytes left)" if len
    msg = 'upstream EOF (chunk)' if chunk
    return proxy_err_response(nil, req_res, msg) if msg

    # HTTP/1.0 upstream:
    wbuf = proxy_write(wbuf, "0\r\n\r\n".freeze, req_res) if alive
    req_res.shutdown
    return proxy_unbuffer(wbuf, nil) if Yahns::WbufLite === wbuf
    return proxy_busy_mod(wbuf, req_res)
  when :wait_readable
    return wait_on_upstream(req_res)
  end until kcar.body_eof?

  if chunk
    # tip is an empty array and becomes trailer storage
    req_res.proxy_trailers = [ rbuf.dup, tip ]
    return proxy_read_trailers(kcar, req_res)
  end
  proxy_busy_mod(wbuf, req_res)
end

#proxy_read_trailers(kcar, req_res) ⇒ Object



195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
# File 'lib/yahns/proxy_http_response.rb', line 195

def proxy_read_trailers(kcar, req_res)
  chunk, tlr = req_res.proxy_trailers
  rbuf = Thread.current[:yahns_rbuf]
  wbuf = req_res.resbuf

  until kcar.trailers(tlr, chunk)
    case rv = req_res.kgio_tryread(0x2000, rbuf)
    when String
      chunk << rv
    when :wait_readable
      return wait_on_upstream(req_res)
    when nil # premature EOF
      return proxy_err_response(nil, req_res, 'upstream EOF (trailers)')
    end # no loop here
  end
  wbuf = proxy_write(wbuf, trailer_out(tlr), req_res)
  return proxy_unbuffer(wbuf) if Yahns::WbufLite === wbuf
  proxy_busy_mod(wbuf, req_res)
end

#proxy_res_headers(res, req_res) ⇒ Object



83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
# File 'lib/yahns/proxy_http_response.rb', line 83

def proxy_res_headers(res, req_res)
  status, headers = res
  code = status.to_i
  msg = Rack::Utils::HTTP_STATUS_CODES[code]
  env = @hs.env
  have_body = !Rack::Utils::STATUS_WITH_NO_ENTITY_BODY.include?(code) &&
              env['REQUEST_METHOD'] != 'HEAD'.freeze
  flags = MSG_DONTWAIT
  alive = @hs.next? && self.class.persistent_connections
  term = false
  response_headers = req_res.proxy_pass.response_headers

  res = "HTTP/1.1 #{msg ? %Q(#{code} #{msg}) : status}\r\n".dup
  headers.each do |key,value| # n.b.: headers is an Array of 2-element Arrays
    case key
    when /\A(?:Connection|Keep-Alive)\z/i
      next # do not let some upstream headers leak through
    when %r{\AContent-Length\z}i
      term = true
      flags |= MSG_MORE if have_body && value.to_i > 0
    when %r{\ATransfer-Encoding\z}i
      term = true if value =~ /\bchunked\b/i
    end

    # response header mapping
    case val = response_headers[key]
    when :ignore
      next
    when String
      value = val
    end

    res << "#{key}: #{value}\r\n"
  end

  # For now, do not add a Date: header, assume upstream already did it
  # but do not care if they did not

  # chunk the response ourselves if the client supports it,
  # but the backend does not terminate properly
  if alive && ! term
    if env['HTTP_VERSION'] == 'HTTP/1.1'.freeze
      res << "Transfer-Encoding: chunked\r\n".freeze
    else # we can't persist HTTP/1.0 and HTTP/0.9 w/o Content-Length
      alive = false
    end
  end
  res << (alive ? "Connection: keep-alive\r\n\r\n".freeze
                : "Connection: close\r\n\r\n".freeze)

  # send the headers
  case rv = kgio_syssend(res, flags)
  when nil then break # all done, likely
  when String # partial write, highly unlikely
    flags = MSG_DONTWAIT
    res = rv # hope the skb grows
  when :wait_writable, :wait_readable # highly unlikely in real apps
    proxy_write(nil, res, req_res)
    break # keep buffering body...
  end while true
  req_res.alive = alive
  have_body
end

#proxy_response_finish(kcar, req_res) ⇒ Object



237
238
239
240
# File 'lib/yahns/proxy_http_response.rb', line 237

def proxy_response_finish(kcar, req_res)
  req_res.proxy_trailers ? proxy_read_trailers(kcar, req_res)
                         : proxy_read_body([], kcar, req_res)
end

#proxy_response_start(res, tip, kcar, req_res) ⇒ Object

start streaming the response once upstream is done sending headers to us. returns :wait_readable if we need to read more from req_res returns :ignore if we yield control to the client(self) returns nil if completely done



219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
# File 'lib/yahns/proxy_http_response.rb', line 219

def proxy_response_start(res, tip, kcar, req_res)
  have_body = proxy_res_headers(res, req_res)
  tip = tip.empty? ? [] : [ tip ]

  if have_body
    req_res.proxy_trailers = nil # define to avoid uninitialized warnings
    return proxy_read_body(tip, kcar, req_res)
  end

  # unlikely
  wbuf = req_res.resbuf
  return proxy_unbuffer(wbuf) if Yahns::WbufLite === wbuf

  # all done reading response from upstream, req_res will be discarded
  # when we return nil:
  proxy_busy_mod(wbuf, req_res)
end

#proxy_unbuffer(wbuf, nxt = :ignore) ⇒ Object

switch and yield



13
14
15
16
17
18
19
# File 'lib/yahns/proxy_http_response.rb', line 13

def proxy_unbuffer(wbuf, nxt = :ignore)
  @state = wbuf
  wbuf.req_res = nil if nxt.nil? && wbuf.respond_to?(:req_res=)
  proxy_wait_next(wbuf.busy == :wait_readable ? Yahns::Queue::QEV_RD :
                  Yahns::Queue::QEV_WR)
  nxt
end

#proxy_wait_next(qflags) ⇒ Object



242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
# File 'lib/yahns/proxy_http_response.rb', line 242

def proxy_wait_next(qflags)
  Thread.current[:yahns_fdmap].remember(self)
  # We must allocate a new, empty request object here to avoid a TOCTTOU
  # in the following timeline
  #
  # original thread:                                 | another thread
  # HttpClient#yahns_step                            |
  # r = k.app.call(env = @hs.env)  # socket hijacked into epoll queue
  # <thread is scheduled away>                       | epoll_wait readiness
  #                                                  | ReqRes#yahns_step
  #                                                  | proxy dispatch ...
  #                                                  | proxy_busy_mod
  # ************************** DANGER BELOW ********************************
  #                                                  | HttpClient#yahns_step
  #                                                  | # clears env
  # sees empty env:                                  |
  # return :ignore if env.include?('rack.hijack_io') |
  #
  # In other words, we cannot touch the original env seen by the
  # original thread since it must see the 'rack.hijack_io' value
  # because both are operating in the same Yahns::HttpClient object.
  # This will happen regardless of GVL existence
  hs = Unicorn::HttpRequest.new
  hs.buf.replace(@hs.buf)
  @hs = hs

  # n.b. we may not touch anything in this object once we call queue_mod,
  # another thread is likely to take it!
  Thread.current[:yahns_queue].queue_mod(self, qflags)
end

#proxy_write(wbuf, buf, req_res) ⇒ Object

write everything in buf to our client socket (or wbuf, if it exists) it may return a newly-created wbuf or nil



31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/yahns/proxy_http_response.rb', line 31

def proxy_write(wbuf, buf, req_res)
  unless wbuf
    # no write buffer, try to write directly to the client socket
    case rv = String === buf ? kgio_trywrite(buf) : kgio_trywritev(buf)
    when nil then return # done writing buf, likely
    when String, Array # partial write, hope the skb grows
      buf = rv
    when :wait_writable, :wait_readable
      wbuf = req_res.resbuf ||= wbuf_alloc(req_res)
      break
    end while true
  end

  wbuf.wbuf_write(self, buf)
  wbuf.busy ? wbuf : nil
end

#response_header_blocked(header, body, alive, offset, count) ⇒ Object



62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/yahns/http_response.rb', line 62

def response_header_blocked(header, body, alive, offset, count)
  if body.respond_to?(:to_path) && count
    alive = Yahns::StreamFile.new(body, alive, offset, count)
    body = nil
  end
  wbuf = Yahns::Wbuf.new(body, alive)
  rv = wbuf.wbuf_write(self, header)
  if body && ! alive.respond_to?(:call) # skip body.each if hijacked
    body.each { |chunk| rv = wbuf.wbuf_write(self, chunk) }
  end
  wbuf_maybe(wbuf, rv)
end

#response_startObject



45
46
47
# File 'lib/yahns/http_response.rb', line 45

def response_start
  @hs.response_start_sent ? ''.freeze : 'HTTP/1.1 '.freeze
end

#response_wait_write(rv) ⇒ Object



49
50
51
52
53
54
55
56
# File 'lib/yahns/http_response.rb', line 49

def response_wait_write(rv)
  # call the kgio_wait_readable or kgio_wait_writable method
  ok = __send__("kgio_#{rv}") and return ok
  k = self.class
  k.logger.info("fd=#{fileno} ip=#@kgio_addr timeout on :#{rv} after "\
                "#{k.client_timeout}s")
  false
end

#trailer_out(tlr) ⇒ Object



300
301
302
# File 'lib/yahns/proxy_http_response.rb', line 300

def trailer_out(tlr)
  "0\r\n#{tlr.map! do |k,v| "#{k}: #{v}\r\n" end.join}\r\n"
end

#wait_on_upstream(req_res) ⇒ Object



78
79
80
81
# File 'lib/yahns/proxy_http_response.rb', line 78

def wait_on_upstream(req_res)
  req_res.resbuf ||= wbuf_alloc(req_res)
  :wait_readable # self remains in :ignore, wait on upstream
end

#wbuf_alloc(req_res) ⇒ Object



21
22
23
24
25
26
27
# File 'lib/yahns/proxy_http_response.rb', line 21

def wbuf_alloc(req_res)
  if req_res.proxy_pass.proxy_buffering
    Yahns::Wbuf.new(nil, req_res.alive)
  else
    Yahns::WbufLite.new(req_res)
  end
end

#wbuf_maybe(wbuf, rv) ⇒ Object



75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# File 'lib/yahns/http_response.rb', line 75

def wbuf_maybe(wbuf, rv)
  case rv # wbuf_write return value
  when nil # all done
    case rv = wbuf.wbuf_close(self)
    when :ignore # hijacked
      @state = rv
    when Yahns::StreamFile
      @state = rv
      :wait_writable
    when true, false
      http_response_done(rv)
    end
  else
    @state = wbuf
    rv
  end
end