Class: Baykit::BayServer::Docker::Cgi::CgiReqContentHandler
- Inherits:
-
Object
- Object
- Baykit::BayServer::Docker::Cgi::CgiReqContentHandler
- Includes:
- Agent, Agent::Multiplexer, Common::Postpone, Rudders, Tours, Tours::ReqContentHandler, Train, Util
- Defined in:
- lib/baykit/bayserver/docker/cgi/cgi_req_content_handler.rb
Constant Summary collapse
- READ_CHUNK_SIZE =
8192
Instance Attribute Summary collapse
-
#available ⇒ Object
readonly
Returns the value of attribute available.
-
#buffers ⇒ Object
readonly
Returns the value of attribute buffers.
-
#cgi_docker ⇒ Object
readonly
Returns the value of attribute cgi_docker.
-
#env ⇒ Object
readonly
Returns the value of attribute env.
-
#last_access ⇒ Object
readonly
Returns the value of attribute last_access.
-
#multiplexer ⇒ Object
Returns the value of attribute multiplexer.
-
#pid ⇒ Object
readonly
Returns the value of attribute pid.
-
#std_err_closed ⇒ Object
readonly
Returns the value of attribute std_err_closed.
-
#std_err_rd ⇒ Object
readonly
Returns the value of attribute std_err_rd.
-
#std_in_rd ⇒ Object
readonly
Returns the value of attribute std_in_rd.
-
#std_out_closed ⇒ Object
readonly
Returns the value of attribute std_out_closed.
-
#std_out_rd ⇒ Object
readonly
Returns the value of attribute std_out_rd.
-
#tour ⇒ Object
readonly
Returns the value of attribute tour.
-
#tour_id ⇒ Object
readonly
Returns the value of attribute tour_id.
Instance Method Summary collapse
- #access ⇒ Object
-
#initialize(cgi_docker, tur, env) ⇒ CgiReqContentHandler
constructor
A new instance of CgiReqContentHandler.
- #on_abort_req(tur) ⇒ Object
- #on_end_req_content(tur) ⇒ Object
-
#on_read_req_content(tur, buf, start, len, &callback) ⇒ Object
Implements ReqContentHandler.
- #process_finished ⇒ Object
-
#req_start_tour ⇒ Object
Other methods.
-
#run ⇒ Object
Implements Postpone.
- #start_tour ⇒ Object
- #timed_out ⇒ Object
- #write_to_std_in(tur, buf, start, len, &callback) ⇒ Object
Constructor Details
#initialize(cgi_docker, tur, env) ⇒ CgiReqContentHandler
Returns a new instance of CgiReqContentHandler.
43 44 45 46 47 48 49 50 51 52 53 54 55 |
# File 'lib/baykit/bayserver/docker/cgi/cgi_req_content_handler.rb', line 43 def initialize(cgi_docker, tur, env) @cgi_docker = cgi_docker @tour = tur @tour_id = tur.tour_id @env = env @pid = 0 @std_in_rd = nil @std_out_rd = nil @std_err_rd = nil @std_out_closed = true @std_err_closed = true @buffers = [] end |
Instance Attribute Details
#available ⇒ Object (readonly)
Returns the value of attribute available.
31 32 33 |
# File 'lib/baykit/bayserver/docker/cgi/cgi_req_content_handler.rb', line 31 def available @available end |
#buffers ⇒ Object (readonly)
Returns the value of attribute buffers.
41 42 43 |
# File 'lib/baykit/bayserver/docker/cgi/cgi_req_content_handler.rb', line 41 def buffers @buffers end |
#cgi_docker ⇒ Object (readonly)
Returns the value of attribute cgi_docker.
28 29 30 |
# File 'lib/baykit/bayserver/docker/cgi/cgi_req_content_handler.rb', line 28 def cgi_docker @cgi_docker end |
#env ⇒ Object (readonly)
Returns the value of attribute env.
40 41 42 |
# File 'lib/baykit/bayserver/docker/cgi/cgi_req_content_handler.rb', line 40 def env @env end |
#last_access ⇒ Object (readonly)
Returns the value of attribute last_access.
38 39 40 |
# File 'lib/baykit/bayserver/docker/cgi/cgi_req_content_handler.rb', line 38 def last_access @last_access end |
#multiplexer ⇒ Object
Returns the value of attribute multiplexer.
39 40 41 |
# File 'lib/baykit/bayserver/docker/cgi/cgi_req_content_handler.rb', line 39 def multiplexer @multiplexer end |
#pid ⇒ Object (readonly)
Returns the value of attribute pid.
32 33 34 |
# File 'lib/baykit/bayserver/docker/cgi/cgi_req_content_handler.rb', line 32 def pid @pid end |
#std_err_closed ⇒ Object (readonly)
Returns the value of attribute std_err_closed.
37 38 39 |
# File 'lib/baykit/bayserver/docker/cgi/cgi_req_content_handler.rb', line 37 def std_err_closed @std_err_closed end |
#std_err_rd ⇒ Object (readonly)
Returns the value of attribute std_err_rd.
35 36 37 |
# File 'lib/baykit/bayserver/docker/cgi/cgi_req_content_handler.rb', line 35 def std_err_rd @std_err_rd end |
#std_in_rd ⇒ Object (readonly)
Returns the value of attribute std_in_rd.
33 34 35 |
# File 'lib/baykit/bayserver/docker/cgi/cgi_req_content_handler.rb', line 33 def std_in_rd @std_in_rd end |
#std_out_closed ⇒ Object (readonly)
Returns the value of attribute std_out_closed.
36 37 38 |
# File 'lib/baykit/bayserver/docker/cgi/cgi_req_content_handler.rb', line 36 def std_out_closed @std_out_closed end |
#std_out_rd ⇒ Object (readonly)
Returns the value of attribute std_out_rd.
34 35 36 |
# File 'lib/baykit/bayserver/docker/cgi/cgi_req_content_handler.rb', line 34 def std_out_rd @std_out_rd end |
#tour ⇒ Object (readonly)
Returns the value of attribute tour.
29 30 31 |
# File 'lib/baykit/bayserver/docker/cgi/cgi_req_content_handler.rb', line 29 def tour @tour end |
#tour_id ⇒ Object (readonly)
Returns the value of attribute tour_id.
30 31 32 |
# File 'lib/baykit/bayserver/docker/cgi/cgi_req_content_handler.rb', line 30 def tour_id @tour_id end |
Instance Method Details
#access ⇒ Object
236 237 238 |
# File 'lib/baykit/bayserver/docker/cgi/cgi_req_content_handler.rb', line 236 def access() @last_access = Time.now.tv_sec end |
#on_abort_req(tur) ⇒ Object
87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 |
# File 'lib/baykit/bayserver/docker/cgi/cgi_req_content_handler.rb', line 87 def on_abort_req(tur) BayLog.debug("%s CGI:abortReq", tur) if !@std_out_closed @multiplexer.req_close(@std_out_rd) end if !@std_err_closed @multiplexer.req_close(@std_err_rd) end if @pid == nil BayLog.warn("%s Cannot kill process (pid is null)", tur) else BayLog.debug("%s KILL PROCESS!: %d", tur, @pid) Process.kill('KILL', @pid) end return false # not aborted immediately end |
#on_end_req_content(tur) ⇒ Object
82 83 84 85 |
# File 'lib/baykit/bayserver/docker/cgi/cgi_req_content_handler.rb', line 82 def on_end_req_content(tur) BayLog.trace("%s CGI:endReqContent", tur) access() end |
#on_read_req_content(tur, buf, start, len, &callback) ⇒ Object
Implements ReqContentHandler
70 71 72 73 74 75 76 77 78 79 80 |
# File 'lib/baykit/bayserver/docker/cgi/cgi_req_content_handler.rb', line 70 def on_read_req_content(tur, buf, start, len, &callback) BayLog.debug("%s CGI:onReadReqContent: len=%d", tur, len) if @pid != 0 write_to_std_in(tur, buf, start, len, &callback) else # postponed @buffers << [buf[start, len].dup, callback] end access() end |
#process_finished ⇒ Object
256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 |
# File 'lib/baykit/bayserver/docker/cgi/cgi_req_content_handler.rb', line 256 def process_finished() BayLog.debug("%s CGI Process finished: pid=%s", @tour, @pid) pid, stat = Process.wait2(@pid) BayLog.debug("%s CGI Process finished: pid=%s code=%s", @tour, pid, stat.exitstatus) agt_id = @tour.ship.agent_id begin if stat.exitstatus != 0 # Exec failed BayLog.error("%s CGI Invalid exit status pid=%d code=%s", @tour, @pid, stat.exitstatus) @tour.res.send_error(@tour_id, HttpStatus::INTERNAL_SERVER_ERROR, "Invalid exit Status") else @tour.res.end_res_content(@tour_id) end rescue IOError => e BayLog.error_e(e) end @cgi_docker.sub_process_count if @cgi_docker.get_wait_count > 0 BayLog.warn("agt#%d Catch up postponed process: process wait count=%d", agt_id, @cgi_docker.get_wait_count) agt = GrandAgent.get(agt_id) agt.req_catch_up end end |
#req_start_tour ⇒ Object
Other methods
111 112 113 114 115 116 117 118 119 120 121 |
# File 'lib/baykit/bayserver/docker/cgi/cgi_req_content_handler.rb', line 111 def req_start_tour if @cgi_docker.add_process_count BayLog.info("%s start tour: wait count=%d", @tour, @cgi_docker.get_wait_count) start_tour else BayLog.warn("%s Cannot start tour: wait count=%d", @tour, @cgi_docker.get_wait_count) agt = GrandAgent.get(@tour.ship.agent_id) agt.add_postpone(self) end access() end |
#run ⇒ Object
Implements Postpone
60 61 62 63 64 |
# File 'lib/baykit/bayserver/docker/cgi/cgi_req_content_handler.rb', line 60 def run @cgi_docker.sub_wait_count BayLog.info("%s challenge postponed tour", @tour, @cgi_docker.get_wait_count) req_start_tour end |
#start_tour ⇒ Object
123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 |
# File 'lib/baykit/bayserver/docker/cgi/cgi_req_content_handler.rb', line 123 def start_tour @available = false std_in = IO.pipe() std_out = IO.pipe() std_err = IO.pipe() std_in[1].set_encoding("ASCII-8BIT") std_out[0].set_encoding("ASCII-8BIT") std_err[0].set_encoding("ASCII-8BIT") command = @cgi_docker.create_command(@env) BayLog.debug("%s Spawn: %s", @tour, command) @pid = Process.spawn(env, command, :in => std_in[0], :out => std_out[1], :err => std_err[1]) BayLog.debug("%s created process; %s", @tour, @pid) std_in[0].close() std_out[1].close() std_err[1].close() @std_in_rd = IORudder.new(std_in[1]) @std_out_rd = IORudder.new(std_out[0]) @std_err_rd = IORudder.new(std_err[0]) BayLog.debug("#{@tour} PID: #{pid}") @buffers.each do |pair| BayLog.debug("%s write postponed data: len=%d", @tour, pair[0].length) write_to_std_in(@tour, pair[0], 0, pair[0].length, &pair[1]) end @std_out_closed = false @std_err_closed = false bufsize = 8192 agt = GrandAgent.get(@tour.ship.agent_id) case(BayServer.harbor.cgi_multiplexer) when Harbor::MULTIPLEXER_TYPE_SPIDER mpx = agt.spider_multiplexer @std_out_rd.set_non_blocking @std_err_rd.set_non_blocking when Harbor::MULTIPLEXER_TYPE_SPIN def eof_checker() begin pid = Process.wait(handler.pid, Process::WNOHANG) return pid != nil rescue Errno::ECHILD => e BayLog.error_e(e) return true end end mpx = agt.spin_multiplexer @std_out_rd.set_non_blocking @std_err_rd.set_non_blocking when Harbor::MULTIPLEXER_TYPE_TAXI mpx = agt.taxi_multiplexer when Harbor::MULTIPLEXER_TYPE_JOB mpx = agt.job_multiplexer else raise Sink.new(); end if mpx != nil @multiplexer = mpx out_ship = CgiStdOutShip.new out_tp = PlainTransporter.new(agt.net_multiplexer, out_ship, false, bufsize, false) out_ship.init_std_out(@std_out_rd, @tour.ship.agent_id, @tour, out_tp, self) mpx.add_rudder_state( @std_out_rd, RudderState.new(@std_out_rd, out_tp) ) ship_id = out_ship.ship_id @tour.res.set_consume_listener do |len, resume| if resume out_ship.resume_read(ship_id) end end err_ship = CgiStdErrShip.new err_tp = PlainTransporter.new(agt.net_multiplexer, err_ship, false, bufsize, false) err_ship.init_std_err(@std_err_rd, @tour.ship.agent_id, self) mpx.add_rudder_state( @std_err_rd, RudderState.new(@std_err_rd, err_tp) ) mpx.req_read(@std_out_rd) mpx.req_read(@std_err_rd) end end |
#timed_out ⇒ Object
240 241 242 243 244 245 246 247 248 |
# File 'lib/baykit/bayserver/docker/cgi/cgi_req_content_handler.rb', line 240 def timed_out() if @cgi_docker.timeout_sec <= 0 return false end duration_sec = Time.now.tv_sec - @last_access BayLog.debug("%s Check CGI timeout: dur=%d, timeout=%d", @tour, duration_sec, @cgi_docker.timeout_sec) return duration_sec > @cgi_docker.timeout_sec end |
#write_to_std_in(tur, buf, start, len, &callback) ⇒ Object
250 251 252 253 254 |
# File 'lib/baykit/bayserver/docker/cgi/cgi_req_content_handler.rb', line 250 def write_to_std_in(tur, buf, start, len, &callback) wrote_len = @std_in_rd.write(buf[start, len]) BayLog.debug("%s CGI:onReadReqContent: wrote=%d", tur, wrote_len) tur.req.consumed(Tour::TOUR_ID_NOCHECK, len, &callback) end |