Class: Yahns::ReqRes

Inherits:
Kgio::Socket
  • Object
show all
Defined in:
lib/yahns/req_res.rb

Overview

:nodoc:

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#aliveObject

Returns the value of attribute alive.



12
13
14
# File 'lib/yahns/req_res.rb', line 12

def alive
  @alive
end

#proxy_passObject (readonly)

Returns the value of attribute proxy_pass.



13
14
15
# File 'lib/yahns/req_res.rb', line 13

def proxy_pass
  @proxy_pass
end

#proxy_trailersObject

Returns the value of attribute proxy_trailers.



11
12
13
# File 'lib/yahns/req_res.rb', line 11

def proxy_trailers
  @proxy_trailers
end

#resbufObject

Returns the value of attribute resbuf.



10
11
12
# File 'lib/yahns/req_res.rb', line 10

def resbuf
  @resbuf
end

Instance Method Details

#prepare_wait_readableObject



146
147
148
149
# File 'lib/yahns/req_res.rb', line 146

def prepare_wait_readable
  @rrstate = Kcar::Parser.new
  :wait_readable # all done sending the request, wait for response
end

#req_start(c, req, input, chunked, proxy_pass) ⇒ Object



15
16
17
18
19
20
21
# File 'lib/yahns/req_res.rb', line 15

def req_start(c, req, input, chunked, proxy_pass)
  @hdr = @resbuf = nil
  @yahns_client = c
  @rrstate = input ? [ req, input, chunked ] : req
  @proxy_pass = proxy_pass
  Thread.current[:yahns_queue].queue_add(self, Yahns::Queue::QEV_WR)
end

#send_req_body(req) ⇒ Object

returns :wait_readable if complete, :wait_writable if not



95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
# File 'lib/yahns/req_res.rb', line 95

def send_req_body(req) # @rrstate == [ (str|vec), rack.input, chunked? ]
  buf, input, chunked = req

  # send the first buffered chunk or vector
  rv = send_req_body_chunk(buf) and return rv # :wait_writable

  # yay, sent the first chunk, now read the body!
  rbuf = buf
  if chunked
    if String === buf # initial body
      req[0] = buf = []
    else
      # try to reuse the biggest non-frozen buffer we just wrote;
      rbuf = buf.max_by(&:size)
      rbuf = ''.dup if rbuf.frozen? # unlikely...
    end
  end

  # Note: input (env['rack.input']) is fully-buffered by default so
  # we should not be waiting on a slow network resource when reading
  # input.  However, some weird configs may disable this on LANs
  # and we may wait indefinitely on input.read here...
  while input.read(0x2000, rbuf)
    if chunked
      buf[0] = "#{rbuf.size.to_s(16)}\r\n".freeze
      buf[1] = rbuf
      buf[2] = "\r\n".freeze
    end
    rv = send_req_body_chunk(buf) and return rv # :wait_writable
  end

  rbuf.clear # all done, clear the big buffer

  # we cannot use respond_to?(:close) here since Rack::Lint::InputWrapper
  # tries to prevent that (and hijack means all Rack specs go out the door)
  case input
  when Yahns::TeeInput, IO
    input.close
  end

  # note: we do not send any trailer, they are folded into the header
  # because this relies on full request buffering
  # prepare_wait_readable is called by send_req_buf
  chunked ? send_req_buf("0\r\n\r\n".freeze) : prepare_wait_readable
rescue Errno::EPIPE, Errno::ECONNRESET, Errno::ENOTCONN
  # no more reading off the client socket, just prepare to forward
  # the rejection response from the upstream (if any)
  @yahns_client.to_io.shutdown(Socket::SHUT_RD)
  prepare_wait_readable
end

#send_req_body_chunk(buf) ⇒ Object



82
83
84
85
86
87
88
89
90
91
92
# File 'lib/yahns/req_res.rb', line 82

def send_req_body_chunk(buf)
  case rv = String === buf ? kgio_trywrite(buf) : kgio_trywritev(buf)
  when String, Array
    buf.replace(rv) # retry loop on partial write
  when :wait_writable, nil
    # :wait_writable = upstream is reading slowly and making us wait
    return rv
  else
    abort "BUG: #{rv.inspect} from kgio_trywrite*"
  end while true
end

#send_req_buf(buf) ⇒ Object

n.b. buf must be a detached string not shared with Thread.current of any thread



153
154
155
156
157
158
159
160
161
162
163
# File 'lib/yahns/req_res.rb', line 153

def send_req_buf(buf)
  case rv = kgio_trywrite(buf)
  when String
    buf = rv # retry inner loop
  when :wait_writable
    @rrstate = buf
    return :wait_writable
  when nil
    return prepare_wait_readable
  end while true
end

#yahns_stepObject

yahns event loop entry point



23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# File 'lib/yahns/req_res.rb', line 23

def yahns_step # yahns event loop entry point
  c = @yahns_client
  case req = @rrstate
  when Kcar::Parser # reading response...
    buf = Thread.current[:yahns_rbuf]

    case resbuf = @resbuf # where are we at the response?
    when nil # common case, catch the response header in a single read

      case rv = kgio_tryread(0x2000, buf)
      when String
        if res = req.headers(@hdr = [], rv)
          return c.proxy_response_start(res, rv, req, self)
        else # ugh, big headers or tricked response
          # we must reinitialize the thread-local rbuf if it may
          # live beyond the current thread
          buf = Thread.current[:yahns_rbuf] = ''.dup
          @resbuf = rv
        end
        # continue looping in middle "case @resbuf" loop
      when :wait_readable
        return rv # spurious wakeup
      when nil
        return c.proxy_err_response(502, self, 'upstream EOF (headers)')
      end # NOT looping here

    when String # continue reading trickled response headers from upstream

      case rv = kgio_tryread(0x2000, buf)
      when String then res = req.headers(@hdr, resbuf << rv) and break
      when :wait_readable then return rv
      when nil
        return c.proxy_err_response(502, self, 'upstream EOF (big headers)')
      end while true
      @resbuf = false

      return c.proxy_response_start(res, resbuf, req, self)

    when Yahns::WbufCommon # streaming/buffering the response body

      return c.proxy_response_finish(req, self)

    end while true # case @resbuf

  when Array # [ (str|vec), rack.input, chunked? ]
    send_req_body(req) # returns nil or :wait_writable
  when String # buffered request header
    send_req_buf(req)
  end
rescue => e
  # avoid polluting logs with a giant backtrace when the problem isn't
  # fixable in code.
  case e
  when Errno::ECONNREFUSED, Errno::ECONNRESET, Errno::EPIPE
    e.set_backtrace([])
  end
  c.proxy_err_response(Yahns::WbufCommon === @resbuf ? nil : 502, self, e)
end