Class: Maestro::Messenger::Queue

Inherits:
Object
  • Object
show all
Defined in:
lib/maestro_common/mq/messenger.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(name, default_handler = nil, opts = {}) ⇒ Queue

Minimum requirements - a name, and a handler for messages we don’t know about

Raises:

  • (ArgumentError)


32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/maestro_common/mq/messenger.rb', line 32

def initialize(name, default_handler = nil, opts = {})
  raise ArgumentError, "#{name} cannot be nil" unless name
  raise ArgumentError, "#{name} must match the following regex #{VALID_QUEUE_REGEX}, '#{name}' does not" unless name.match(/#{VALID_QUEUE_REGEX}/)

  self.handlers = {}
  self.connected = false
  self.default_handler = default_handler
  self.name = name
  self.opts = opts
  self.use_eventmachine = true

  if default_handler
    register_handler(default_handler)
  else
    Maestro.log.info "No default handler for queue '#{name}'.  Unknown messages will be dropped"
  end
end

Instance Attribute Details

#connectedObject

Returns the value of attribute connected.



29
30
31
# File 'lib/maestro_common/mq/messenger.rb', line 29

def connected
  @connected
end

#default_handlerObject

Returns the value of attribute default_handler.



29
30
31
# File 'lib/maestro_common/mq/messenger.rb', line 29

def default_handler
  @default_handler
end

#handlersObject

Returns the value of attribute handlers.



29
30
31
# File 'lib/maestro_common/mq/messenger.rb', line 29

def handlers
  @handlers
end

#messengerObject

Returns the value of attribute messenger.



29
30
31
# File 'lib/maestro_common/mq/messenger.rb', line 29

def messenger
  @messenger
end

#nameObject

Returns the value of attribute name.



29
30
31
# File 'lib/maestro_common/mq/messenger.rb', line 29

def name
  @name
end

#optsObject

Returns the value of attribute opts.



29
30
31
# File 'lib/maestro_common/mq/messenger.rb', line 29

def opts
  @opts
end

#use_eventmachineObject

Returns the value of attribute use_eventmachine.



29
30
31
# File 'lib/maestro_common/mq/messenger.rb', line 29

def use_eventmachine
  @use_eventmachine
end

Instance Method Details

#connectObject



137
138
139
140
141
142
143
144
145
146
147
148
149
# File 'lib/maestro_common/mq/messenger.rb', line 137

def connect
  if messenger
    # If eventmachine enabled will use it to defer delivery so we can receive the message and continue with life
    # That can be an issue sometimes as different threads may execute at different rates, causing unintentional
    # bursts of chronometric radiation that can cause messages to appear to execute in non-sequential order.
    # (Actually they are, but since multiple may be executing in parallel it can have unintended consequences)
    # Set the 'use_eventmachine' property of the queue connection to 'false' to ensure message processing completes
    # in the order of reception.  This will effectively block incoming messages, so the consequences of that should
    # be taken into account.
    Maestro::MQHelper.subscribe(@name, opts) { |message| use_eventmachine ? EventMachine.defer { handle_incoming_message(message) } : handle_incoming_message(message) } unless connected?
    self.connected = true
  end
end

#connected?Boolean

Returns:

  • (Boolean)


156
157
158
# File 'lib/maestro_common/mq/messenger.rb', line 156

def connected?
  return connected
end

#disconnectObject



151
152
153
154
# File 'lib/maestro_common/mq/messenger.rb', line 151

def disconnect
  Maestro::MQHelper.unsubscribe(@name) if @name && connected?
  self.connected = false
end

#handle_incoming_message(message) ⇒ Object



72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
# File 'lib/maestro_common/mq/messenger.rb', line 72

def handle_incoming_message(message)
  # All consumers expect message data to be json and parse... maybe just parse it here and pass parsed data
  # then if we use different encoding noone will be any the wiser
  begin
    Maestro.log.debug "Received Message #{message.body}" if messenger.debug

    # Only pass messages that have not expired
    now = Time.now.to_i * 1000
    expiration = message.headers['expires'].to_i

    if (expiration > 0 && now > expiration)
      timing = "Message expired at: #{expiration} [#{Time.at expiration/1000}], Current time: #{now} [#{Time.at now/1000}]"
      Maestro.log.warn "Skipping agent queue expired message. #{timing}: #{message.body}"
      # drop message
    else
      hash = JSON.parse(message.body)
      msg_type = hash['__msg_type__'] || '-legacy-'
      hash['__msg_from__'] = name
      handler = handlers[msg_type]

      if handler && handler.respond_to?(:on_incoming_message)
        # Depending on arity, send either (self, type, content) or (self, type, content, raw)
        case handler.method(:on_incoming_message).arity
        when 3
          handler.on_incoming_message(self, msg_type, hash['__msg_content__'])
        when 4
          handler.on_incoming_message(self, msg_type, hash['__msg_content__'], hash)
        end
      else
        if default_handler && default_handler.respond_to?(:on_unhandled_message)
          default_handler.on_unhandled_message(self, msg_type, hash['__msg_content__'], hash)
        else
          Maestro.log.warn "Dumping unhandled '#{msg_type}' message (no default handler): #{hash}"
        end
      end

      Maestro.log.debug("Processed #{msg_type}: #{message.body}") if messenger.debug
    end
  rescue Exception => e
    # something happened executing processing the message/running the plugin
    backtrace = e.backtrace.join("\n")
    # set the error with the exception message and backtrace
    Maestro.log.error "Error processing message - #{e.class}:#{e}\n#{backtrace}"
  ensure
    if opts && opts[:ack] == 'client'
      Maestro::MQHelper.connection.ack(message)
    end
  end
end

#register_handler(handler, msg_types = nil) ⇒ Object



50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/maestro_common/mq/messenger.rb', line 50

def register_handler(handler, msg_types = nil)
  msg_types = handler.get_handled_message_types unless msg_types

  if msg_types && !msg_types.empty?
    msg_types.each do |t|
      handlers[t] = handler
      Maestro.log.debug "Registering message type '#{t}' to handler '#{handler.class.name}' on queue #{name}"
    end
  else
    Maestro.log.debug "No message types registered for queue '#{name}' all messages will be #{default_handler ? 'sent to "on_unhandled_message"' : 'dropped'}"
  end
end

#send_message_async(message, options = DEFAULT_SEND_OPTIONS, &block) ⇒ Object Also known as: send_message



63
64
65
# File 'lib/maestro_common/mq/messenger.rb', line 63

def send_message_async(message, options = DEFAULT_SEND_OPTIONS, &block)
  messenger.send_message_async(name, message, options, block)
end

#send_message_sync(message, timeout = DEFAULT_SYNC_TIMEOUT, options = DEFAULT_SEND_OPTIONS, &block) ⇒ Object



68
69
70
# File 'lib/maestro_common/mq/messenger.rb', line 68

def send_message_sync(message, timeout = DEFAULT_SYNC_TIMEOUT, options = DEFAULT_SEND_OPTIONS, &block)
  messenger.send_message_sync(name, message, timeout, options, block)
end