Class: Maestro::Messenger::Queue
- Inherits:
-
Object
- Object
- Maestro::Messenger::Queue
- Defined in:
- lib/maestro_common/mq/messenger.rb
Instance Attribute Summary collapse
-
#connected ⇒ Object
Returns the value of attribute connected.
-
#default_handler ⇒ Object
Returns the value of attribute default_handler.
-
#handlers ⇒ Object
Returns the value of attribute handlers.
-
#messenger ⇒ Object
Returns the value of attribute messenger.
-
#name ⇒ Object
Returns the value of attribute name.
-
#opts ⇒ Object
Returns the value of attribute opts.
-
#use_eventmachine ⇒ Object
Returns the value of attribute use_eventmachine.
Instance Method Summary collapse
- #connect ⇒ Object
- #connected? ⇒ Boolean
- #disconnect ⇒ Object
- #handle_incoming_message(message) ⇒ Object
-
#initialize(name, default_handler = nil, opts = {}) ⇒ Queue
constructor
Minimum requirements - a name, and a handler for messages we don’t know about.
- #register_handler(handler, msg_types = nil) ⇒ Object
- #send_message_async(message, options = DEFAULT_SEND_OPTIONS, &block) ⇒ Object (also: #send_message)
- #send_message_sync(message, timeout = DEFAULT_SYNC_TIMEOUT, options = DEFAULT_SEND_OPTIONS, &block) ⇒ Object
Constructor Details
#initialize(name, default_handler = nil, opts = {}) ⇒ Queue
Minimum requirements - a name, and a handler for messages we don’t know about
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
# File 'lib/maestro_common/mq/messenger.rb', line 32 def initialize(name, default_handler = nil, opts = {}) raise ArgumentError, "#{name} cannot be nil" unless name raise ArgumentError, "#{name} must match the following regex #{VALID_QUEUE_REGEX}, '#{name}' does not" unless name.match(/#{VALID_QUEUE_REGEX}/) self.handlers = {} self.connected = false self.default_handler = default_handler self.name = name self.opts = opts self.use_eventmachine = true if default_handler register_handler(default_handler) else Maestro.log.info "No default handler for queue '#{name}'. Unknown messages will be dropped" end end |
Instance Attribute Details
#connected ⇒ Object
Returns the value of attribute connected.
29 30 31 |
# File 'lib/maestro_common/mq/messenger.rb', line 29 def connected @connected end |
#default_handler ⇒ Object
Returns the value of attribute default_handler.
29 30 31 |
# File 'lib/maestro_common/mq/messenger.rb', line 29 def default_handler @default_handler end |
#handlers ⇒ Object
Returns the value of attribute handlers.
29 30 31 |
# File 'lib/maestro_common/mq/messenger.rb', line 29 def handlers @handlers end |
#messenger ⇒ Object
Returns the value of attribute messenger.
29 30 31 |
# File 'lib/maestro_common/mq/messenger.rb', line 29 def messenger @messenger end |
#name ⇒ Object
Returns the value of attribute name.
29 30 31 |
# File 'lib/maestro_common/mq/messenger.rb', line 29 def name @name end |
#opts ⇒ Object
Returns the value of attribute opts.
29 30 31 |
# File 'lib/maestro_common/mq/messenger.rb', line 29 def opts @opts end |
#use_eventmachine ⇒ Object
Returns the value of attribute use_eventmachine.
29 30 31 |
# File 'lib/maestro_common/mq/messenger.rb', line 29 def use_eventmachine @use_eventmachine end |
Instance Method Details
#connect ⇒ Object
137 138 139 140 141 142 143 144 145 146 147 148 149 |
# File 'lib/maestro_common/mq/messenger.rb', line 137 def connect if messenger # If eventmachine enabled will use it to defer delivery so we can receive the message and continue with life # That can be an issue sometimes as different threads may execute at different rates, causing unintentional # bursts of chronometric radiation that can cause messages to appear to execute in non-sequential order. # (Actually they are, but since multiple may be executing in parallel it can have unintended consequences) # Set the 'use_eventmachine' property of the queue connection to 'false' to ensure message processing completes # in the order of reception. This will effectively block incoming messages, so the consequences of that should # be taken into account. Maestro::MQHelper.subscribe(@name, opts) { || use_eventmachine ? EventMachine.defer { () } : () } unless connected? self.connected = true end end |
#connected? ⇒ Boolean
156 157 158 |
# File 'lib/maestro_common/mq/messenger.rb', line 156 def connected? return connected end |
#disconnect ⇒ Object
151 152 153 154 |
# File 'lib/maestro_common/mq/messenger.rb', line 151 def disconnect Maestro::MQHelper.unsubscribe(@name) if @name && connected? self.connected = false end |
#handle_incoming_message(message) ⇒ Object
72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 |
# File 'lib/maestro_common/mq/messenger.rb', line 72 def () # All consumers expect message data to be json and parse... maybe just parse it here and pass parsed data # then if we use different encoding noone will be any the wiser begin Maestro.log.debug "Received Message #{.body}" if messenger.debug # Only pass messages that have not expired now = Time.now.to_i * 1000 expiration = .headers['expires'].to_i if (expiration > 0 && now > expiration) timing = "Message expired at: #{expiration} [#{Time.at expiration/1000}], Current time: #{now} [#{Time.at now/1000}]" Maestro.log.warn "Skipping agent queue expired message. #{timing}: #{.body}" # drop message else hash = JSON.parse(.body) msg_type = hash['__msg_type__'] || '-legacy-' hash['__msg_from__'] = name handler = handlers[msg_type] if handler && handler.respond_to?(:on_incoming_message) # Depending on arity, send either (self, type, content) or (self, type, content, raw) case handler.method(:on_incoming_message).arity when 3 handler.(self, msg_type, hash['__msg_content__']) when 4 handler.(self, msg_type, hash['__msg_content__'], hash) end else if default_handler && default_handler.respond_to?(:on_unhandled_message) default_handler.(self, msg_type, hash['__msg_content__'], hash) else Maestro.log.warn "Dumping unhandled '#{msg_type}' message (no default handler): #{hash}" end end Maestro.log.debug("Processed #{msg_type}: #{.body}") if messenger.debug end rescue Exception => e # something happened executing processing the message/running the plugin backtrace = e.backtrace.join("\n") # set the error with the exception message and backtrace Maestro.log.error "Error processing message - #{e.class}:#{e}\n#{backtrace}" ensure if opts && opts[:ack] == 'client' Maestro::MQHelper.connection.ack() end end end |
#register_handler(handler, msg_types = nil) ⇒ Object
50 51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/maestro_common/mq/messenger.rb', line 50 def register_handler(handler, msg_types = nil) msg_types = handler. unless msg_types if msg_types && !msg_types.empty? msg_types.each do |t| handlers[t] = handler Maestro.log.debug "Registering message type '#{t}' to handler '#{handler.class.name}' on queue #{name}" end else Maestro.log.debug "No message types registered for queue '#{name}' all messages will be #{default_handler ? 'sent to "on_unhandled_message"' : 'dropped'}" end end |
#send_message_async(message, options = DEFAULT_SEND_OPTIONS, &block) ⇒ Object Also known as: send_message
63 64 65 |
# File 'lib/maestro_common/mq/messenger.rb', line 63 def (, = DEFAULT_SEND_OPTIONS, &block) messenger.(name, , , block) end |
#send_message_sync(message, timeout = DEFAULT_SYNC_TIMEOUT, options = DEFAULT_SEND_OPTIONS, &block) ⇒ Object
68 69 70 |
# File 'lib/maestro_common/mq/messenger.rb', line 68 def (, timeout = DEFAULT_SYNC_TIMEOUT, = DEFAULT_SEND_OPTIONS, &block) messenger.(name, , timeout, , block) end |