Class: Maestro::Messenger
- Inherits:
-
Object
- Object
- Maestro::Messenger
- Defined in:
- lib/maestro_common/mq/messenger.rb
Defined Under Namespace
Constant Summary collapse
- VALID_QUEUE_REGEX =
"^/(?:queue|topic)/[A-Za-z0-9_\.-]*$"- DEFAULT_SEND_OPTIONS =
{ :persistent => true, :content_type => 'application/json' }
- DEFAULT_SYNC_TIMEOUT =
5- DEFAULT_CUSTOM_TEXT =
'message'
Instance Attribute Summary collapse
-
#debug ⇒ Object
Returns the value of attribute debug.
-
#from_name ⇒ Object
Returns the value of attribute from_name.
-
#queues ⇒ Object
readonly
Returns the value of attribute queues.
-
#seq ⇒ Object
readonly
Returns the value of attribute seq.
Instance Method Summary collapse
- #connected? ⇒ Boolean
- #deregister_queue(queue) ⇒ Object
- #disconnect ⇒ Object
-
#initialize ⇒ Messenger
constructor
A new instance of Messenger.
- #reconnect ⇒ Object
- #register_queue(queue) ⇒ Object
- #send_message_async(destination_queue, message, options = DEFAULT_SEND_OPTIONS, &block) ⇒ Object (also: #send_message)
- #send_message_sync(destination_queue, message, timeout = DEFAULT_SYNC_TIMEOUT, options = DEFAULT_SEND_OPTIONS, &block) ⇒ Object
- #stop ⇒ Object
Constructor Details
#initialize ⇒ Messenger
162 163 164 165 166 167 168 |
# File 'lib/maestro_common/mq/messenger.rb', line 162 def initialize @lock = Monitor.new @seq = 0 @queues = {} @from_name = Maestro::MQHelper.connect end |
Instance Attribute Details
#debug ⇒ Object
Returns the value of attribute debug.
16 17 18 |
# File 'lib/maestro_common/mq/messenger.rb', line 16 def debug @debug end |
#from_name ⇒ Object
Returns the value of attribute from_name.
16 17 18 |
# File 'lib/maestro_common/mq/messenger.rb', line 16 def from_name @from_name end |
#queues ⇒ Object (readonly)
Returns the value of attribute queues.
17 18 19 |
# File 'lib/maestro_common/mq/messenger.rb', line 17 def queues @queues end |
#seq ⇒ Object (readonly)
Returns the value of attribute seq.
17 18 19 |
# File 'lib/maestro_common/mq/messenger.rb', line 17 def seq @seq end |
Instance Method Details
#connected? ⇒ Boolean
170 171 172 |
# File 'lib/maestro_common/mq/messenger.rb', line 170 def connected? return MQHelper.connection && MQHelper.connection.connected? end |
#deregister_queue(queue) ⇒ Object
181 182 183 184 185 186 187 188 |
# File 'lib/maestro_common/mq/messenger.rb', line 181 def deregister_queue(queue) queue = queues[queue] if queue.is_a?(String) if queue queues.delete(queue.name) queue.disconnect end end |
#disconnect ⇒ Object
199 200 201 |
# File 'lib/maestro_common/mq/messenger.rb', line 199 def disconnect MQHelper.disconnect if connected? end |
#reconnect ⇒ Object
195 196 197 |
# File 'lib/maestro_common/mq/messenger.rb', line 195 def reconnect MQHelper.reconnect unless connected? end |
#register_queue(queue) ⇒ Object
174 175 176 177 178 179 |
# File 'lib/maestro_common/mq/messenger.rb', line 174 def register_queue(queue) queues[queue.name] = queue queue.messenger = self return queue end |
#send_message_async(destination_queue, message, options = DEFAULT_SEND_OPTIONS, &block) ⇒ Object Also known as: send_message
203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 |
# File 'lib/maestro_common/mq/messenger.rb', line 203 def (destination_queue, , = DEFAULT_SEND_OPTIONS, &block) my_seq = seq ['__msg_seq__'] = my_seq custom_text = .has_key?('__msg_type__') ? ['__msg_type__'] : DEFAULT_CUSTOM_TEXT Maestro.log.debug "[seq #{my_seq}] Sending #{custom_text}" if debug Maestro::MQHelper.connection.send( destination_queue, MultiJson.dump(), ) do |r| Maestro.log.debug "[seq #{my_seq}] Sent #{custom_text} to '#{destination_queue}'" if debug yield(r) if block end end |
#send_message_sync(destination_queue, message, timeout = DEFAULT_SYNC_TIMEOUT, options = DEFAULT_SEND_OPTIONS, &block) ⇒ Object
220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 |
# File 'lib/maestro_common/mq/messenger.rb', line 220 def (destination_queue, , timeout = DEFAULT_SYNC_TIMEOUT, = DEFAULT_SEND_OPTIONS, &block) ackd = false rr = nil (destination_queue, , ) do |r| rr = r ackd = true end begin Timeout::timeout(timeout) do while !ackd sleep 0.25 end ackd = true yield(rr) if block end rescue Timeout::Error raise MessageSendError, "Message ##{message['__msg_seq__']} not ack'd by MQ" end return ackd end |
#stop ⇒ Object
190 191 192 193 |
# File 'lib/maestro_common/mq/messenger.rb', line 190 def stop queues.each { |q| deregister_queue(queue) } MQHelper.disconnect end |