Class: Gilmour::Responder

Inherits:
Object
  • Object
show all
Defined in:
lib/gilmour/responder.rb

Overview

Every request handler is executed in the context of a Responder object. This class contains methods to respond to requests as well as proxy methods for carrying out gilmour actions inside the handlers.

Constant Summary collapse

LOG_SEPERATOR =
'%%'
LOG_PREFIX =
"#{LOG_SEPERATOR}gilmour#{LOG_SEPERATOR}"

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(sender, topic, data, backend, opts = {}) ⇒ Responder

:nodoc:



63
64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/gilmour/responder.rb', line 63

def initialize(sender, topic, data, backend, opts={}) #:nodoc:
  @sender = sender
  @request = Request.new(topic, data)
  @response = { data: nil, code: nil }
  @backend = backend
  @timeout = opts[:timeout] || 600
  @multi_process = opts[:fork] || false
  @respond = opts[:respond]
  @response_pipe = IO.pipe("UTF-8")
  @logger_pipe = IO.pipe("UTF-8")
  @command_pipe = IO.pipe("UTF-8")
  @logger = make_logger()
  @delayed_response = false
end

Instance Attribute Details

#backendObject (readonly)

Returns the value of attribute backend.



32
33
34
# File 'lib/gilmour/responder.rb', line 32

def backend
  @backend
end

#loggerObject (readonly)

Returns the value of attribute logger.



30
31
32
# File 'lib/gilmour/responder.rb', line 30

def logger
  @logger
end

#requestObject (readonly)

Returns the value of attribute request.



31
32
33
# File 'lib/gilmour/responder.rb', line 31

def request
  @request
end

Instance Method Details

#_execute(handler) ⇒ Object

Called by child



291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
# File 'lib/gilmour/responder.rb', line 291

def _execute(handler) #:nodoc:
  ret = nil
  begin
    Timeout.timeout(@timeout) do
      ret = instance_eval(&handler)
    end
  rescue Timeout::Error => e
    logger.error e.message
    logger.error e.backtrace
    @response[:code] = 504
    @response[:data] = e.message
  rescue Exception => e
    logger.error e.message
    logger.error e.backtrace
    @response[:code] = 500
    @response[:data] = e.message
  end
  @response[:code] ||= 200 if @respond && !delayed_response?
  send_response if @response[:code]
end

#add_listener(topic, opts = {}, &handler) ⇒ Object

proxy to base add_listener



100
101
102
103
104
105
106
107
# File 'lib/gilmour/responder.rb', line 100

def add_listener(topic, opts={}, &handler)
  if @multi_process
    GLogger.error "Dynamic listeners using add_listener not supported \
    in forked responder. Ignoring!"
  end

  @backend.add_listener(topic, &handler)
end

#call_parent_backend_method(method, *args) ⇒ Object

:nodoc:



312
313
314
315
316
# File 'lib/gilmour/responder.rb', line 312

def call_parent_backend_method(method, *args) #:nodoc:
  msg = JSON.generate([method, args])
  @write_command_pipe.write(msg+"\n")
  @write_command_pipe.flush
end

#child_logger(writer) ⇒ Object

:nodoc:



34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/gilmour/responder.rb', line 34

def child_logger(writer) #:nodoc:
  logger = Logger.new(STDERR)
  loglevel =  ENV["LOG_LEVEL"] ? ENV["LOG_LEVEL"].to_sym : :warn
  logger.level = Gilmour::LoggerLevels[loglevel] || Logger::WARN
  logger.formatter = proc do |severity, datetime, progname, msg|
    begin
      data = JSON.generate(severity: severity, msg: msg)
      #        data = "#{LOG_PREFIX}#{severity}#{LOG_SEPERATOR}#{msg}"
      writer.write(data+"\n")
      writer.flush
    rescue
      GLogger.error "Logger error: #{severity}: #{msg}"
    end
    nil
  end
  logger
end

#command_relay(reader, waiter) ⇒ Object

:nodoc:



155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
# File 'lib/gilmour/responder.rb', line 155

def command_relay(reader, waiter) #:nodoc:
  waiter.add
  pub_mutex = Mutex.new

  Thread.new do
    loop do
      begin
        data = reader.readline
        pub_mutex.synchronize do
          method, args = JSON.parse(data)
          @backend.send(method.to_sym, *args)
        end
      rescue EOFError
        waiter.done
        break
      rescue Exception => e
        GLogger.debug e.message
        GLogger.debug e.backtrace
      end
    end 
  end 
end

#delay_responseObject

This prohibits sending a response from a reply handler even after the request execution has finished. This is useful for sending a response from inside a closure if the handler has to make further gilmour requests. To send a response later, call respond with the option “now” as true.



147
148
149
# File 'lib/gilmour/responder.rb', line 147

def delay_response
  @delayed_response = true
end

#delayed_response?Boolean

:nodoc:

Returns:

  • (Boolean)


151
152
153
# File 'lib/gilmour/responder.rb', line 151

def delayed_response? #:nodoc:
  @delayed_response
end

#emit_error(message, code = 500, extra = {}) ⇒ Object

Publish all errors on gilmour.error This may or may not have a listener based on the configuration supplied at setup.



275
276
277
278
279
280
281
282
283
284
285
286
287
288
# File 'lib/gilmour/responder.rb', line 275

def emit_error(message, code = 500, extra = {}) #:nodoc:
  opts = {
    topic: @request.topic,
    request_data: @request.body,
    userdata: JSON.generate(extra || {}),
    sender: @sender,
    multi_process: @multi_process,
    timestamp: Time.now.getutc
  }

  payload = { backtrace: message, code: code }
  payload.merge!(opts)
  @backend.emit_error payload
end

#execute(handler) ⇒ Object

Called by parent



203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
# File 'lib/gilmour/responder.rb', line 203

def execute(handler) #:nodoc:
  if !@multi_process
    _execute(handler)
    return
  end
  GLogger.debug "Executing #{@sender} in forked moode"

  # Create pipes for child communication
  @read_pipe, @write_pipe = @response_pipe
  @read_command_pipe, @write_command_pipe = @command_pipe
  @read_logger_pipe, @write_logger_pipe = @logger_pipe = IO.pipe("UTF-8")

  # setup relay threads
  wg = Gilmour::Waiter.new
  relay_threads = []
  relay_threads << logger_relay(@read_logger_pipe, wg, @logger)
  relay_threads << command_relay(@read_command_pipe, wg)

  pid = Process.fork do
    @backend.stop
    EventMachine.stop_event_loop

    #Close the parent channels in forked process
    @read_pipe.close
    @read_command_pipe.close
    @read_logger_pipe.close

    @response_sent = false

    # override the logger for the child
    @logger = child_logger(@write_logger_pipe)
    _execute(handler)
    @write_logger_pipe.close
  end

  # Cleanup the writers in Parent process.
  @write_logger_pipe.close
  @write_pipe.close
  @write_command_pipe.close

  begin
    receive_data(@read_pipe.readline)
  rescue EOFError => e
    logger.debug e.message
  end

  pid, status = Process.waitpid2(pid)
  if !status || status.exitstatus > 0
    msg = if !status
            "Child Process #{pid} crashed without status."
          else
            "Child Process #{pid} exited with status #{status.exitstatus}"
          end
    logger.error msg
    # Set the multi-process mode as false, the child has died anyway.
    @multi_process = false
    write_response(@sender, msg, 500)
  end

  @read_pipe.close

  # relay cleanup.
  wg.wait do
    relay_threads.each { |th| th.kill }
  end
  @read_command_pipe.close
  @read_logger_pipe.close
end

#logger_relay(read_logger_pipe, waiter, parent_logger) ⇒ Object

All logs in forked mode are relayed chr



179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
# File 'lib/gilmour/responder.rb', line 179

def logger_relay(read_logger_pipe, waiter, parent_logger) #:nodoc:
  waiter.add 1
  Thread.new do
    loop do
      begin
        data = read_logger_pipe.readline.chomp
        logdata = JSON.parse(data)
        meth = logdata['severity'].downcase.to_sym
        parent_logger.send(meth, logdata['msg'])
      rescue JSON::ParserError
        parent_logger.info data
        next
      rescue EOFError
        waiter.done
        break
      rescue Exception => e
        GLogger.error e.message
        GLogger.error e.backtrace
      end
    end #loop
  end 
end

#make_loggerObject

:nodoc:



52
53
54
55
56
57
58
59
60
61
# File 'lib/gilmour/responder.rb', line 52

def make_logger #:nodoc:
  logger = Logger.new(STDERR)
  loglevel =  ENV["LOG_LEVEL"] ? ENV["LOG_LEVEL"].to_sym : :warn
  logger.level = Gilmour::LoggerLevels[loglevel] || Logger::WARN
  logger.formatter = proc do |severity, datetime, progname, msg|
    date_format = datetime.strftime("%Y-%m-%d %H:%M:%S")
    "#{severity[0]} #{date_format} #{@sender} -> #{msg}\n"
  end
  logger
end

#publish(message, destination, opts = {}, code = nil, &blk) ⇒ Object

Proxy to publish method. See Backend#publish



319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
# File 'lib/gilmour/responder.rb', line 319

def publish(message, destination, opts = {}, code=nil, &blk)
  if @multi_process
    if block_given?
      GLogger.error "Publish callback not supported in forked responder. Ignoring!"
    end
    call_parent_backend_method('publish', message, destination, opts, code)
#        method = opts[:method] || 'publish'
#        msg = JSON.generate([method, [message, destination, opts, code]])
#        @write_command_pipe.write(msg+"\n")
#        @write_command_pipe.flush
  elsif block_given?
    @backend.publish(message, destination, opts, &blk)
  else
    @backend.publish(message, destination, opts)
  end
end

#receive_data(data) ⇒ Object

:nodoc:



78
79
80
81
82
83
84
85
# File 'lib/gilmour/responder.rb', line 78

def receive_data(data) #:nodoc:
  sender, res_data, res_code, opts = JSON.parse(data)
  res_code ||= 200 if @respond
  write_response(sender, res_data, res_code) if sender && res_code
rescue => e
  GLogger.error e.message
  GLogger.error e.backtrace
end

#reply_to(topic, opts = {}, &handler) ⇒ Object

Proxy to register reply listener (see Backend#reply_to for details)



120
121
122
123
124
125
126
127
# File 'lib/gilmour/responder.rb', line 120

def reply_to(topic, opts={}, &handler)
  if @multi_process
    GLogger.error "Dynamic listeners using add_listener not supported \
    in forked responder. Ignoring!"
  end

  @backend.reply_to(topic, opts, &handler)
end

#request!(message, destination, opts = {}, &blk) ⇒ Object

Proxy to request! method. See Backend#request!



337
338
339
340
341
342
343
344
345
346
# File 'lib/gilmour/responder.rb', line 337

def request!(message, destination, opts={}, &blk)
  if @multi_process
    if block_given?
      GLogger.error "Publish callback not supported in forked responder. Ignoring!"
    end
    call_parent_backend_method('request!', message, destination, opts)
  else
    @backend.request!(message, destination, opts, &blk)
  end
end

#respond(body, code = 200, opts = {}) ⇒ Object

Sends a response with body and code If opts[:now] is true, the response is sent immediately, else it is defered until the handler finishes executing



133
134
135
136
137
138
139
140
# File 'lib/gilmour/responder.rb', line 133

def respond(body, code = 200, opts = {})
  @response[:data] = body
  @response[:code] = code
  if opts[:now]
    send_response
    @response = {}
  end
end

#send_responseObject

Called by child



359
360
361
362
363
364
365
366
367
368
369
370
# File 'lib/gilmour/responder.rb', line 359

def send_response #:nodoc:
  return if @response_sent
  @response_sent = true

  if @multi_process
    msg = JSON.generate([@sender, @response[:data], @response[:code]])
    @write_pipe.write(msg+"\n")
    @write_pipe.flush # This flush is very important
  else
    write_response(@sender, @response[:data], @response[:code])
  end
end

#signal!(message, destination, opts = {}) ⇒ Object

Proxy to signal! method. See Backend#signal!



349
350
351
352
353
354
355
# File 'lib/gilmour/responder.rb', line 349

def signal!(message, destination, opts={})
  if @multi_process
    call_parent_backend_method('signal!', message, destination, opts)
  else
    @backend.signal!(message, destination, opts)
  end
end

#slot(topic, opts = {}, &handler) ⇒ Object

Proxy to register slot (see Backend#slot for details)



110
111
112
113
114
115
116
117
# File 'lib/gilmour/responder.rb', line 110

def slot(topic, opts={}, &handler)
  if @multi_process
    GLogger.error "Dynamic listeners using add_listener not supported \
    in forked responder. Ignoring!"
  end

  @backend.slot(topic, opts, &handler)
end

#write_response(sender, data, code) ⇒ Object

Called by parent



88
89
90
91
92
93
94
95
96
97
# File 'lib/gilmour/responder.rb', line 88

def write_response(sender, data, code) #:nodoc:
  return unless @respond
  if code >= 300 && @backend.report_errors?
    emit_error data, code
  end
  @backend.send_response(sender, data, code)
rescue => e
  GLogger.error e.message
  GLogger.error e.backtrace
end