Class: Gilmour::Responder
- Inherits:
-
Object
- Object
- Gilmour::Responder
- Defined in:
- lib/gilmour/responder.rb
Overview
Every request handler is executed in the context of a Responder object. This class contains methods to respond to requests as well as proxy methods for carrying out gilmour actions inside the handlers.
Constant Summary collapse
- LOG_SEPERATOR =
'%%'- LOG_PREFIX =
"#{LOG_SEPERATOR}gilmour#{LOG_SEPERATOR}"
Instance Attribute Summary collapse
-
#backend ⇒ Object
readonly
Returns the value of attribute backend.
-
#logger ⇒ Object
readonly
Returns the value of attribute logger.
-
#request ⇒ Object
readonly
Returns the value of attribute request.
Instance Method Summary collapse
-
#_execute(handler) ⇒ Object
Called by child.
-
#add_listener(topic, opts = {}, &handler) ⇒ Object
proxy to base add_listener.
-
#call_parent_backend_method(method, *args) ⇒ Object
:nodoc:.
-
#child_logger(writer) ⇒ Object
:nodoc:.
-
#command_relay(reader, waiter) ⇒ Object
:nodoc:.
-
#delay_response ⇒ Object
This prohibits sending a response from a reply handler even after the request execution has finished.
-
#delayed_response? ⇒ Boolean
:nodoc:.
-
#emit_error(message, code = 500, extra = {}) ⇒ Object
Publish all errors on gilmour.error This may or may not have a listener based on the configuration supplied at setup.
-
#execute(handler) ⇒ Object
Called by parent.
-
#initialize(sender, topic, data, backend, opts = {}) ⇒ Responder
constructor
:nodoc:.
-
#logger_relay(read_logger_pipe, waiter, parent_logger) ⇒ Object
All logs in forked mode are relayed chr.
-
#make_logger ⇒ Object
:nodoc:.
-
#publish(message, destination, opts = {}, code = nil, &blk) ⇒ Object
Proxy to publish method.
-
#receive_data(data) ⇒ Object
:nodoc:.
-
#reply_to(topic, opts = {}, &handler) ⇒ Object
Proxy to register reply listener (see Backend#reply_to for details).
-
#request!(message, destination, opts = {}, &blk) ⇒ Object
Proxy to request! method.
-
#respond(body, code = 200, opts = {}) ⇒ Object
Sends a response with body and code If
opts[:now]is true, the response is sent immediately, else it is defered until the handler finishes executing. -
#send_response ⇒ Object
Called by child.
-
#signal!(message, destination, opts = {}) ⇒ Object
Proxy to signal! method.
-
#slot(topic, opts = {}, &handler) ⇒ Object
Proxy to register slot (see Backend#slot for details).
-
#write_response(sender, data, code) ⇒ Object
Called by parent.
Constructor Details
#initialize(sender, topic, data, backend, opts = {}) ⇒ Responder
:nodoc:
63 64 65 66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/gilmour/responder.rb', line 63 def initialize(sender, topic, data, backend, opts={}) #:nodoc: @sender = sender @request = Request.new(topic, data) @response = { data: nil, code: nil } @backend = backend @timeout = opts[:timeout] || 600 @multi_process = opts[:fork] || false @respond = opts[:respond] @response_pipe = IO.pipe("UTF-8") @logger_pipe = IO.pipe("UTF-8") @command_pipe = IO.pipe("UTF-8") @logger = make_logger() @delayed_response = false end |
Instance Attribute Details
#backend ⇒ Object (readonly)
Returns the value of attribute backend.
32 33 34 |
# File 'lib/gilmour/responder.rb', line 32 def backend @backend end |
#logger ⇒ Object (readonly)
Returns the value of attribute logger.
30 31 32 |
# File 'lib/gilmour/responder.rb', line 30 def logger @logger end |
#request ⇒ Object (readonly)
Returns the value of attribute request.
31 32 33 |
# File 'lib/gilmour/responder.rb', line 31 def request @request end |
Instance Method Details
#_execute(handler) ⇒ Object
Called by child
291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 |
# File 'lib/gilmour/responder.rb', line 291 def _execute(handler) #:nodoc: ret = nil begin Timeout.timeout(@timeout) do ret = instance_eval(&handler) end rescue Timeout::Error => e logger.error e. logger.error e.backtrace @response[:code] = 504 @response[:data] = e. rescue Exception => e logger.error e. logger.error e.backtrace @response[:code] = 500 @response[:data] = e. end @response[:code] ||= 200 if @respond && !delayed_response? send_response if @response[:code] end |
#add_listener(topic, opts = {}, &handler) ⇒ Object
proxy to base add_listener
100 101 102 103 104 105 106 107 |
# File 'lib/gilmour/responder.rb', line 100 def add_listener(topic, opts={}, &handler) if @multi_process GLogger.error "Dynamic listeners using add_listener not supported \ in forked responder. Ignoring!" end @backend.add_listener(topic, &handler) end |
#call_parent_backend_method(method, *args) ⇒ Object
:nodoc:
312 313 314 315 316 |
# File 'lib/gilmour/responder.rb', line 312 def call_parent_backend_method(method, *args) #:nodoc: msg = JSON.generate([method, args]) @write_command_pipe.write(msg+"\n") @write_command_pipe.flush end |
#child_logger(writer) ⇒ Object
:nodoc:
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
# File 'lib/gilmour/responder.rb', line 34 def child_logger(writer) #:nodoc: logger = Logger.new(STDERR) loglevel = ENV["LOG_LEVEL"] ? ENV["LOG_LEVEL"].to_sym : :warn logger.level = Gilmour::LoggerLevels[loglevel] || Logger::WARN logger.formatter = proc do |severity, datetime, progname, msg| begin data = JSON.generate(severity: severity, msg: msg) # data = "#{LOG_PREFIX}#{severity}#{LOG_SEPERATOR}#{msg}" writer.write(data+"\n") writer.flush rescue GLogger.error "Logger error: #{severity}: #{msg}" end nil end logger end |
#command_relay(reader, waiter) ⇒ Object
:nodoc:
155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 |
# File 'lib/gilmour/responder.rb', line 155 def command_relay(reader, waiter) #:nodoc: waiter.add pub_mutex = Mutex.new Thread.new do loop do begin data = reader.readline pub_mutex.synchronize do method, args = JSON.parse(data) @backend.send(method.to_sym, *args) end rescue EOFError waiter.done break rescue Exception => e GLogger.debug e. GLogger.debug e.backtrace end end end end |
#delay_response ⇒ Object
This prohibits sending a response from a reply handler even after the request execution has finished. This is useful for sending a response from inside a closure if the handler has to make further gilmour requests. To send a response later, call respond with the option “now” as true.
147 148 149 |
# File 'lib/gilmour/responder.rb', line 147 def delay_response @delayed_response = true end |
#delayed_response? ⇒ Boolean
:nodoc:
151 152 153 |
# File 'lib/gilmour/responder.rb', line 151 def delayed_response? #:nodoc: @delayed_response end |
#emit_error(message, code = 500, extra = {}) ⇒ Object
Publish all errors on gilmour.error This may or may not have a listener based on the configuration supplied at setup.
275 276 277 278 279 280 281 282 283 284 285 286 287 288 |
# File 'lib/gilmour/responder.rb', line 275 def emit_error(, code = 500, extra = {}) #:nodoc: opts = { topic: @request.topic, request_data: @request.body, userdata: JSON.generate(extra || {}), sender: @sender, multi_process: @multi_process, timestamp: Time.now.getutc } payload = { backtrace: , code: code } payload.merge!(opts) @backend.emit_error payload end |
#execute(handler) ⇒ Object
Called by parent
203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 |
# File 'lib/gilmour/responder.rb', line 203 def execute(handler) #:nodoc: if !@multi_process _execute(handler) return end GLogger.debug "Executing #{@sender} in forked moode" # Create pipes for child communication @read_pipe, @write_pipe = @response_pipe @read_command_pipe, @write_command_pipe = @command_pipe @read_logger_pipe, @write_logger_pipe = @logger_pipe = IO.pipe("UTF-8") # setup relay threads wg = Gilmour::Waiter.new relay_threads = [] relay_threads << logger_relay(@read_logger_pipe, wg, @logger) relay_threads << command_relay(@read_command_pipe, wg) pid = Process.fork do @backend.stop EventMachine.stop_event_loop #Close the parent channels in forked process @read_pipe.close @read_command_pipe.close @read_logger_pipe.close @response_sent = false # override the logger for the child @logger = child_logger(@write_logger_pipe) _execute(handler) @write_logger_pipe.close end # Cleanup the writers in Parent process. @write_logger_pipe.close @write_pipe.close @write_command_pipe.close begin receive_data(@read_pipe.readline) rescue EOFError => e logger.debug e. end pid, status = Process.waitpid2(pid) if !status || status.exitstatus > 0 msg = if !status "Child Process #{pid} crashed without status." else "Child Process #{pid} exited with status #{status.exitstatus}" end logger.error msg # Set the multi-process mode as false, the child has died anyway. @multi_process = false write_response(@sender, msg, 500) end @read_pipe.close # relay cleanup. wg.wait do relay_threads.each { |th| th.kill } end @read_command_pipe.close @read_logger_pipe.close end |
#logger_relay(read_logger_pipe, waiter, parent_logger) ⇒ Object
All logs in forked mode are relayed chr
179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 |
# File 'lib/gilmour/responder.rb', line 179 def logger_relay(read_logger_pipe, waiter, parent_logger) #:nodoc: waiter.add 1 Thread.new do loop do begin data = read_logger_pipe.readline.chomp logdata = JSON.parse(data) meth = logdata['severity'].downcase.to_sym parent_logger.send(meth, logdata['msg']) rescue JSON::ParserError parent_logger.info data next rescue EOFError waiter.done break rescue Exception => e GLogger.error e. GLogger.error e.backtrace end end #loop end end |
#make_logger ⇒ Object
:nodoc:
52 53 54 55 56 57 58 59 60 61 |
# File 'lib/gilmour/responder.rb', line 52 def make_logger #:nodoc: logger = Logger.new(STDERR) loglevel = ENV["LOG_LEVEL"] ? ENV["LOG_LEVEL"].to_sym : :warn logger.level = Gilmour::LoggerLevels[loglevel] || Logger::WARN logger.formatter = proc do |severity, datetime, progname, msg| date_format = datetime.strftime("%Y-%m-%d %H:%M:%S") "#{severity[0]} #{date_format} #{@sender} -> #{msg}\n" end logger end |
#publish(message, destination, opts = {}, code = nil, &blk) ⇒ Object
Proxy to publish method. See Backend#publish
319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 |
# File 'lib/gilmour/responder.rb', line 319 def publish(, destination, opts = {}, code=nil, &blk) if @multi_process if block_given? GLogger.error "Publish callback not supported in forked responder. Ignoring!" end call_parent_backend_method('publish', , destination, opts, code) # method = opts[:method] || 'publish' # msg = JSON.generate([method, [message, destination, opts, code]]) # @write_command_pipe.write(msg+"\n") # @write_command_pipe.flush elsif block_given? @backend.publish(, destination, opts, &blk) else @backend.publish(, destination, opts) end end |
#receive_data(data) ⇒ Object
:nodoc:
78 79 80 81 82 83 84 85 |
# File 'lib/gilmour/responder.rb', line 78 def receive_data(data) #:nodoc: sender, res_data, res_code, opts = JSON.parse(data) res_code ||= 200 if @respond write_response(sender, res_data, res_code) if sender && res_code rescue => e GLogger.error e. GLogger.error e.backtrace end |
#reply_to(topic, opts = {}, &handler) ⇒ Object
Proxy to register reply listener (see Backend#reply_to for details)
120 121 122 123 124 125 126 127 |
# File 'lib/gilmour/responder.rb', line 120 def reply_to(topic, opts={}, &handler) if @multi_process GLogger.error "Dynamic listeners using add_listener not supported \ in forked responder. Ignoring!" end @backend.reply_to(topic, opts, &handler) end |
#request!(message, destination, opts = {}, &blk) ⇒ Object
Proxy to request! method. See Backend#request!
337 338 339 340 341 342 343 344 345 346 |
# File 'lib/gilmour/responder.rb', line 337 def request!(, destination, opts={}, &blk) if @multi_process if block_given? GLogger.error "Publish callback not supported in forked responder. Ignoring!" end call_parent_backend_method('request!', , destination, opts) else @backend.request!(, destination, opts, &blk) end end |
#respond(body, code = 200, opts = {}) ⇒ Object
Sends a response with body and code If opts[:now] is true, the response is sent immediately, else it is defered until the handler finishes executing
133 134 135 136 137 138 139 140 |
# File 'lib/gilmour/responder.rb', line 133 def respond(body, code = 200, opts = {}) @response[:data] = body @response[:code] = code if opts[:now] send_response @response = {} end end |
#send_response ⇒ Object
Called by child
359 360 361 362 363 364 365 366 367 368 369 370 |
# File 'lib/gilmour/responder.rb', line 359 def send_response #:nodoc: return if @response_sent @response_sent = true if @multi_process msg = JSON.generate([@sender, @response[:data], @response[:code]]) @write_pipe.write(msg+"\n") @write_pipe.flush # This flush is very important else write_response(@sender, @response[:data], @response[:code]) end end |
#signal!(message, destination, opts = {}) ⇒ Object
Proxy to signal! method. See Backend#signal!
349 350 351 352 353 354 355 |
# File 'lib/gilmour/responder.rb', line 349 def signal!(, destination, opts={}) if @multi_process call_parent_backend_method('signal!', , destination, opts) else @backend.signal!(, destination, opts) end end |
#slot(topic, opts = {}, &handler) ⇒ Object
Proxy to register slot (see Backend#slot for details)
110 111 112 113 114 115 116 117 |
# File 'lib/gilmour/responder.rb', line 110 def slot(topic, opts={}, &handler) if @multi_process GLogger.error "Dynamic listeners using add_listener not supported \ in forked responder. Ignoring!" end @backend.slot(topic, opts, &handler) end |
#write_response(sender, data, code) ⇒ Object
Called by parent
88 89 90 91 92 93 94 95 96 97 |
# File 'lib/gilmour/responder.rb', line 88 def write_response(sender, data, code) #:nodoc: return unless @respond if code >= 300 && @backend.report_errors? emit_error data, code end @backend.send_response(sender, data, code) rescue => e GLogger.error e. GLogger.error e.backtrace end |