Module: Yahns::HttpResponse

Includes:
Unicorn::HttpResponse
Included in:
HttpClient
Defined in:
lib/yahns/http_response.rb,
lib/yahns/proxy_http_response.rb

Overview

loaded by yahns/proxy_pass, this relies on Yahns::HttpResponse for constants.

Constant Summary collapse

MTX =
Mutex.new
CCC_RESPONSE_START =

avoid GC overhead for frequently used-strings/objects:

[ 'HTTP', '/1.1 ' ]
MSG_MORE =
0
MSG_DONTWAIT =
0

Instance Method Summary collapse

Instance Method Details

#chunk_out(buf) ⇒ Object

n.b.: we can use String#size for optimized dispatch under YARV instead of String#bytesize because all the IO read methods return a binary string when given a maximum read length



323
324
325
# File 'lib/yahns/proxy_http_response.rb', line 323

def chunk_out(buf)
  [ "#{buf.size.to_s(16)}\r\n", buf, "\r\n".freeze ]
end

#do_cccObject

returns nil on success :wait_readable/:wait_writable/:close for epoll



226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
# File 'lib/yahns/http_response.rb', line 226

def do_ccc
  @hs.response_start_sent = true
  wbuf = nil
  rv = nil
  CCC_RESPONSE_START.each do |buf|
    if wbuf
      wbuf << buf
    else
      case rv = kgio_trywrite(buf)
      when nil
        break
      when String
        buf = rv
      when :wait_writable, :wait_readable
        if self.class.output_buffering
          wbuf = buf.dup
          @state = Yahns::WbufStr.new(wbuf, :ccc_done)
          break
        else
          response_wait_write(rv) or return :close
        end
      end while true
    end
  end
  rv
end

#err_response(code) ⇒ Object



56
57
58
# File 'lib/yahns/http_response.rb', line 56

def err_response(code)
  "#{response_start}#{code} #{Rack::Utils::HTTP_STATUS_CODES[code]}\r\n\r\n"
end

#have_more?(value) ⇒ Boolean

Returns:

  • (Boolean)


121
122
123
# File 'lib/yahns/http_response.rb', line 121

def have_more?(value)
  value.to_i > 0 && @hs.env['REQUEST_METHOD'] != 'HEAD'.freeze
end

#http_100_response(env) ⇒ Object

only used if input_buffering is true (not :lazy or false) input_buffering==:lazy/false gives control to the app returns nil on success returns :close, :wait_writable, or :wait_readable



257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
# File 'lib/yahns/http_response.rb', line 257

def http_100_response(env)
  env.delete('HTTP_EXPECT'.freeze) =~ /\A100-continue\z/i or return
  buf = @hs.response_start_sent ? "100 Continue\r\n\r\nHTTP/1.1 ".freeze
                                : "HTTP/1.1 100 Continue\r\n\r\n".freeze

  case rv = kgio_trywrite(buf)
  when String
    buf = rv
  when :wait_writable, :wait_readable
    if self.class.output_buffering
      @state = Yahns::WbufStr.new(buf, :r100_done)
      return rv
    else
      response_wait_write(rv) or return :close
    end
  else
    return rv
  end while true
end

#http_response_done(alive) ⇒ Object



91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
# File 'lib/yahns/http_response.rb', line 91

def http_response_done(alive)
  @input = @input.close if @input
  if alive
    # @hs.buf will have data if the client pipelined
    if @hs.buf.empty?
      @state = :headers
      :wait_readable
    else
      @state = :pipelined
      # we shouldn't start processing the application again until we know
      # the socket is writable for the response
      :wait_writable
    end
  else
    # shutdown is needed in case the app forked, we rescue here since
    # StreamInput may issue shutdown as well
    shutdown rescue nil
    :close
  end
end

#http_response_write(status, headers, body) ⇒ Object

writes the rack_response to socket as an HTTP response returns :wait_readable, :wait_writable, :forget, or nil



127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
# File 'lib/yahns/http_response.rb', line 127

def http_response_write(status, headers, body)
  offset = 0
  count = hijack = nil
  k = self.class
  alive = @hs.next? && k.persistent_connections
  flags = MSG_DONTWAIT

  if @hs.headers?
    code = status.to_i
    msg = Rack::Utils::HTTP_STATUS_CODES[code]
    buf = "#{response_start}#{msg ? %Q(#{code} #{msg}) : status}\r\n" \
          "Date: #{httpdate}\r\n".dup
    headers.each do |key, value|
      case key
      when %r{\ADate\z}i
        next
      when %r{\AContent-Range\z}i
        if %r{\Abytes (\d+)-(\d+)/\d+\z} =~ value
          offset = $1.to_i
          count = $2.to_i - offset + 1
        end
        kv_str(buf, key, value)
      when %r{\AConnection\z}i
        # allow Rack apps to tell us they want to drop the client
        alive = false if value =~ /\bclose\b/i
      when %r{\AContent-Length\z}i
        flags |= MSG_MORE if have_more?(value)
        kv_str(buf, key, value)
      when "rack.hijack"
        hijack = value
      else
        kv_str(buf, key, value)
      end
    end
    buf << (alive ? "Connection: keep-alive\r\n\r\n".freeze
                  : "Connection: close\r\n\r\n".freeze)
    case rv = kgio_syssend(buf, flags)
    when nil # all done, likely
      buf.clear
      buf = nil # recycle any memory we used ASAP
      break
    when String
      flags = MSG_DONTWAIT
      buf = rv # unlikely, hope the skb grows
    when :wait_writable, :wait_readable # unlikely
      if k.output_buffering
        alive = hijack ? hijack : alive
        rv = response_header_blocked(rv, buf, body, alive, offset, count)
        body = nil # ensure we do not close body in ensure
        return rv
      else
        response_wait_write(rv) or return :close
      end
    end while true
  end

  return response_hijacked(hijack) if hijack

  if body.respond_to?(:to_path)
    @state = body = Yahns::StreamFile.new(body, alive, offset, count)
    return step_write
  end

  wbuf = rv = nil
  body.each do |chunk|
    if wbuf
      rv = wbuf.wbuf_write(self, chunk)
    else
      case rv = kgio_trywrite(chunk)
      when nil # all done, likely and good!
        break
      when String
        chunk = rv # hope the skb grows when we loop into the trywrite
      when :wait_writable, :wait_readable
        if k.output_buffering
          wbuf = Yahns::Wbuf.new(body, alive, k.output_buffer_tmpdir, rv)
          rv = wbuf.wbuf_write(self, chunk)
          break
        else
          response_wait_write(rv) or return :close
        end
      end while true
    end
  end

  # if we buffered the write body, we must return :wait_writable
  # (or :wait_readable for SSL) and hit Yahns::HttpClient#step_write
  if wbuf
    body = nil # ensure we do not close the body in ensure
    wbuf_maybe(wbuf, rv)
  else
    http_response_done(alive)
  end
ensure
  body.respond_to?(:close) and body.close
end

#httpdateObject



21
22
23
# File 'lib/yahns/http_response.rb', line 21

def httpdate
  MTX.synchronize { super }
end

#kgio_syssend(buf, flags) ⇒ Object



38
39
40
# File 'lib/yahns/http_response.rb', line 38

def kgio_syssend(buf, flags)
  kgio_trywrite(buf)
end

#kv_str(buf, key, value) ⇒ Object



112
113
114
115
116
117
118
119
# File 'lib/yahns/http_response.rb', line 112

def kv_str(buf, key, value)
  if value.include?("\n".freeze)
    # avoiding blank, key-only cookies with /\n+/
    value.split(/\n+/).each { |v| buf << "#{key}: #{v}\r\n" }
  else
    buf << "#{key}: #{value}\r\n"
  end
end

#proxy_busy_mod_blocked(wbuf, busy) ⇒ Object



304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
# File 'lib/yahns/proxy_http_response.rb', line 304

def proxy_busy_mod_blocked(wbuf, busy)
  # we are completely done reading and buffering the upstream response,
  # but have not completely written the response to the client,
  # yield control to the client socket:
  @state = wbuf
  proxy_wait_next(case busy
    when :wait_readable then Yahns::Queue::QEV_RD
    when :wait_writable then Yahns::Queue::QEV_WR
    else
      abort "BUG: invalid wbuf.busy: #{busy.inspect}"
    end)
  # no touching self after proxy_wait_next, we may be running
  # HttpClient#yahns_step in a different thread at this point
  nil # signal close for ReqRes#yahns_step
end

#proxy_busy_mod_done(alive) ⇒ Object



294
295
296
297
298
299
300
301
302
# File 'lib/yahns/proxy_http_response.rb', line 294

def proxy_busy_mod_done(alive)
  case http_response_done(alive)
  when :wait_readable then proxy_wait_next(Yahns::Queue::QEV_RD)
  when :wait_writable then proxy_wait_next(Yahns::Queue::QEV_WR)
  when :close then close
  end

  nil # signal close for ReqRes#yahns_step
end

#proxy_err_response(code, req_res, exc, wbuf) ⇒ Object



30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/yahns/proxy_http_response.rb', line 30

def proxy_err_response(code, req_res, exc, wbuf)
  logger = @hs.env['rack.logger']
  case exc
  when nil
    logger.error('premature upstream EOF')
  when Kcar::ParserError
    logger.error("upstream response error: #{exc.message}")
  else
    Yahns::Log.exception(logger, 'upstream error', exc)
  end
  # try to write something, but don't care if we fail
  Integer === code and
    kgio_trywrite("HTTP/1.1 #{code} #{
                   Rack::Utils::HTTP_STATUS_CODES[code]}\r\n\r\n") rescue nil

  shutdown rescue nil
  @input = @input.close if @input

  # this is safe ONLY because we are in an :ignore state after
  # Fdmap#forget when we got hijacked:
  close

  nil # signal close of req_res from yahns_step in yahns/proxy_pass.rb
ensure
  wbuf.wbuf_abort if wbuf
end

#proxy_response_finish(kcar, wbuf, req_res) ⇒ Object



197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
# File 'lib/yahns/proxy_http_response.rb', line 197

def proxy_response_finish(kcar, wbuf, req_res)
  rbuf = Thread.current[:yahns_rbuf]
  if len = kcar.body_bytes_left # known Content-Length

    case tmp = req_res.kgio_tryread(0x2000, rbuf)
    when String
      len = kcar.body_bytes_left -= tmp.size
      wbuf.wbuf_write(self, tmp)
    when nil # premature EOF
      return proxy_err_response(nil, req_res, nil, wbuf)
    when :wait_readable
      return :wait_readable # self remains in :ignore, wait on upstream
    end while len != 0

  elsif kcar.chunked? # nasty chunked response body
    buf = ''.dup

    unless req_res.proxy_trailers
      # are we done dechunking the main body, yet?
      case tmp = req_res.kgio_tryread(0x2000, rbuf)
      when String
        kcar.filter_body(buf, tmp)
        buf.empty? or wbuf.wbuf_write(self, chunk_out(buf))
      when nil # premature EOF
        return proxy_err_response(nil, req_res, nil, wbuf)
      when :wait_readable
        return :wait_readable # self remains in :ignore, wait on upstream
      end until kcar.body_eof?
      req_res.proxy_trailers = [ tmp, [] ] # onto trailers!
      rbuf = Thread.current[:yahns_rbuf] = ''.dup
    end

    buf, tlr = *req_res.proxy_trailers
    until kcar.trailers(tlr, buf)
      case rv = req_res.kgio_tryread(0x2000, rbuf)
      when String
        buf << rv
      when :wait_readable
        return :wait_readable
      when nil # premature EOF
        return proxy_err_response(nil, req_res, nil, wbuf)
      end # no loop here
    end
    wbuf.wbuf_write(self, trailer_out(tlr))

  else # no Content-Length or Transfer-Encoding: chunked, wait on EOF!

    alive = wbuf.wbuf_persist
    case tmp = req_res.kgio_tryread(0x2000, rbuf)
    when String
      tmp = chunk_out(tmp) if alive
      wbuf.wbuf_write(self, tmp)
    when nil
      wbuf.wbuf_write(self, "0\r\n\r\n".freeze) if alive
      req_res.shutdown
      break
    when :wait_readable
      return :wait_readable # self remains in :ignore, wait on upstream
    end while true

  end

  busy = wbuf.busy and return proxy_busy_mod_blocked(wbuf, busy)
  proxy_busy_mod_done(wbuf.wbuf_persist) # returns nil to close req_res
end

#proxy_response_start(res, tip, kcar, req_res) ⇒ Object

start streaming the response once upstream is done sending headers to us. returns :wait_readable if we need to read more from req_res returns :ignore if we yield control to the client(self) returns nil if completely done



68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
# File 'lib/yahns/proxy_http_response.rb', line 68

def proxy_response_start(res, tip, kcar, req_res)
  status, headers = res
  code = status.to_i
  msg = Rack::Utils::HTTP_STATUS_CODES[code]
  env = @hs.env
  have_body = !Rack::Utils::STATUS_WITH_NO_ENTITY_BODY.include?(code) &&
              env['REQUEST_METHOD'] != 'HEAD'.freeze
  flags = MSG_DONTWAIT
  alive = @hs.next? && self.class.persistent_connections
  term = false
  response_headers = env['yahns.proxy_pass.response_headers']

  res = "HTTP/1.1 #{msg ? %Q(#{code} #{msg}) : status}\r\n".dup
  headers.each do |key,value| # n.b.: headers is an Array of 2-element Arrays
    case key
    when /\A(?:Connection|Keep-Alive)\z/i
      next # do not let some upstream headers leak through
    when %r{\AContent-Length\z}i
      term = true
      flags |= MSG_MORE if have_body && value.to_i > 0
    when %r{\ATransfer-Encoding\z}i
      term = true if value =~ /\bchunked\b/i
    end

    # response header mapping
    case val = response_headers[key]
    when :ignore
      next
    when String
      value = val
    end

    res << "#{key}: #{value}\r\n"
  end

  # For now, do not add a Date: header, assume upstream already did it
  # but do not care if they did not

  # chunk the response ourselves if the client supports it,
  # but the backend does not terminate properly
  if alive && ! term
    if env['HTTP_VERSION'] == 'HTTP/1.1'.freeze
      res << "Transfer-Encoding: chunked\r\n".freeze
    else # we can't persist HTTP/1.0 and HTTP/0.9 w/o Content-Length
      alive = false
    end
  end
  res << (alive ? "Connection: keep-alive\r\n\r\n".freeze
                : "Connection: close\r\n\r\n".freeze)

  # send the headers
  case rv = kgio_syssend(res, flags)
  when nil then break # all done, likely
  when String # partial write, highly unlikely
    flags = MSG_DONTWAIT
    res = rv # hope the skb grows
  when :wait_writable, :wait_readable # highly unlikely in real apps
    wbuf = proxy_write(nil, res, alive)
    break # keep buffering as much as possible
  end while true

  rbuf = Thread.current[:yahns_rbuf]
  tip = tip.empty? ? [] : [ tip ]

  if have_body
    if len = kcar.body_bytes_left

      case tmp = tip.shift || req_res.kgio_tryread(0x2000, rbuf)
      when String
        len = kcar.body_bytes_left -= tmp.size
        wbuf = proxy_write(wbuf, tmp, alive)
      when nil # premature EOF
        return proxy_err_response(nil, req_res, nil, wbuf)
      when :wait_readable
        return wait_on_upstream(req_res, alive, wbuf)
      end until len == 0

    elsif kcar.chunked? # nasty chunked body
      req_res.proxy_trailers = nil # define to avoid warnings for now
      buf = ''.dup
      case tmp = tip.shift || req_res.kgio_tryread(0x2000, rbuf)
      when String
        kcar.filter_body(buf, tmp)
        wbuf = proxy_write(wbuf, chunk_out(buf), alive) unless buf.empty?
      when nil # premature EOF
        return proxy_err_response(nil, req_res, nil, wbuf)
      when :wait_readable
        return wait_on_upstream(req_res, alive, wbuf)
      end until kcar.body_eof?

      buf = tmp
      req_res.proxy_trailers = [ buf, tlr = [] ]
      rbuf = Thread.current[:yahns_rbuf] = ''.dup
      until kcar.trailers(tlr, buf)
        case rv = req_res.kgio_tryread(0x2000, rbuf)
        when String
          buf << rv
        when :wait_readable
          return wait_on_upstream(req_res, alive, wbuf)
        when nil # premature EOF
          return proxy_err_response(nil, req_res, nil, wbuf)
        end # no loop here
      end
      wbuf = proxy_write(wbuf, trailer_out(tlr), alive)

    else # no Content-Length or Transfer-Encoding: chunked, wait on EOF!

      case tmp = tip.shift || req_res.kgio_tryread(0x2000, rbuf)
      when String
        tmp = chunk_out(tmp) if alive
        wbuf = proxy_write(wbuf, tmp, alive)
      when nil
        wbuf = proxy_write(wbuf, "0\r\n\r\n".freeze, true) if alive
        req_res.shutdown
        break
      when :wait_readable
        return wait_on_upstream(req_res, alive, wbuf)
      end while true

    end
  end

  # all done reading response from upstream, req_res will be discarded
  # when we return nil:
  wbuf ? proxy_busy_mod_blocked(wbuf, wbuf.busy) : proxy_busy_mod_done(alive)
rescue => e
  proxy_err_response(502, req_res, e, wbuf)
end

#proxy_wait_next(qflags) ⇒ Object



263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
# File 'lib/yahns/proxy_http_response.rb', line 263

def proxy_wait_next(qflags)
  Thread.current[:yahns_fdmap].remember(self)
  # We must allocate a new, empty request object here to avoid a TOCTTOU
  # in the following timeline
  #
  # original thread:                                 | another thread
  # HttpClient#yahns_step                            |
  # r = k.app.call(env = @hs.env)  # socket hijacked into epoll queue
  # <thread is scheduled away>                       | epoll_wait readiness
  #                                                  | ReqRes#yahns_step
  #                                                  | proxy dispatch ...
  #                                                  | proxy_busy_mod_done
  # ************************** DANGER BELOW ********************************
  #                                                  | HttpClient#yahns_step
  #                                                  | # clears env
  # sees empty env:                                  |
  # return :ignore if env.include?('rack.hijack_io') |
  #
  # In other words, we cannot touch the original env seen by the
  # original thread since it must see the 'rack.hijack_io' value
  # because both are operating in the same Yahns::HttpClient object.
  # This will happen regardless of GVL existence
  hs = Unicorn::HttpRequest.new
  hs.buf.replace(@hs.buf)
  @hs = hs

  # n.b. we may not touch anything in this object once we call queue_mod,
  # another thread is likely to take it!
  Thread.current[:yahns_queue].queue_mod(self, qflags)
end

#proxy_write(wbuf, buf, alive) ⇒ Object

write everything in buf to our client socket (or wbuf, if it exists) it may return a newly-created wbuf or nil



12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# File 'lib/yahns/proxy_http_response.rb', line 12

def proxy_write(wbuf, buf, alive)
  unless wbuf
    # no write buffer, try to write directly to the client socket
    case rv = String === buf ? kgio_trywrite(buf) : kgio_trywritev(buf)
    when nil then return # done writing buf, likely
    when String, Array # partial write, hope the skb grows
      buf = rv
    when :wait_writable, :wait_readable
      wbuf = Yahns::Wbuf.new(nil, alive, self.class.output_buffer_tmpdir, rv)
      buf = buf.join if Array === buf
      break
    end while true
  end

  wbuf.wbuf_write(self, buf)
  wbuf.busy ? wbuf : nil
end

#response_header_blocked(ret, header, body, alive, offset, count) ⇒ Object



60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/yahns/http_response.rb', line 60

def response_header_blocked(ret, header, body, alive, offset, count)
  if body.respond_to?(:to_path)
    alive = Yahns::StreamFile.new(body, alive, offset, count)
    body = nil
  end
  wbuf = Yahns::Wbuf.new(body, alive, self.class.output_buffer_tmpdir, ret)
  rv = wbuf.wbuf_write(self, header)
  if body && ! alive.respond_to?(:call) # skip body.each if hijacked
    body.each { |chunk| rv = wbuf.wbuf_write(self, chunk) }
  end
  wbuf_maybe(wbuf, rv)
end

#response_startObject



43
44
45
# File 'lib/yahns/http_response.rb', line 43

def response_start
  @hs.response_start_sent ? ''.freeze : 'HTTP/1.1 '.freeze
end

#response_wait_write(rv) ⇒ Object



47
48
49
50
51
52
53
54
# File 'lib/yahns/http_response.rb', line 47

def response_wait_write(rv)
  # call the kgio_wait_readable or kgio_wait_writable method
  ok = __send__("kgio_#{rv}") and return ok
  k = self.class
  k.logger.info("fd=#{fileno} ip=#@kgio_addr timeout on :#{rv} after "\
                "#{k.client_timeout}s")
  false
end

#trailer_out(tlr) ⇒ Object



327
328
329
# File 'lib/yahns/proxy_http_response.rb', line 327

def trailer_out(tlr)
  "0\r\n#{tlr.map! do |k,v| "#{k}: #{v}\r\n" end.join}\r\n"
end

#wait_on_upstream(req_res, alive, wbuf) ⇒ Object



57
58
59
60
61
62
# File 'lib/yahns/proxy_http_response.rb', line 57

def wait_on_upstream(req_res, alive, wbuf)
  req_res.resbuf = wbuf || Yahns::Wbuf.new(nil, alive,
                                           self.class.output_buffer_tmpdir,
                                           false)
  :wait_readable # self remains in :ignore, wait on upstream
end

#wbuf_maybe(wbuf, rv) ⇒ Object



73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
# File 'lib/yahns/http_response.rb', line 73

def wbuf_maybe(wbuf, rv)
  case rv # wbuf_write return value
  when nil # all done
    case rv = wbuf.wbuf_close(self)
    when :ignore # hijacked
      @state = rv
    when Yahns::StreamFile
      @state = rv
      :wait_writable
    when true, false
      http_response_done(rv)
    end
  else
    @state = wbuf
    rv
  end
end