Class: Maestro::Messenger

Inherits:
Object
  • Object
show all
Defined in:
lib/maestro_common/mq/messenger.rb

Defined Under Namespace

Classes: Message, Queue

Constant Summary collapse

VALID_QUEUE_REGEX =
"^/(?:queue|topic)/[A-Za-z0-9_\.-]*$"
DEFAULT_SEND_OPTIONS =
{ :persistent => true, :content_type => 'application/json' }
DEFAULT_SYNC_TIMEOUT =
5
DEFAULT_CUSTOM_TEXT =
'message'

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeMessenger



162
163
164
165
166
167
168
# File 'lib/maestro_common/mq/messenger.rb', line 162

def initialize
  @lock = Monitor.new
  @seq = 0
  @queues = {}
  @from_name =
  Maestro::MQHelper.connect
end

Instance Attribute Details

#debugObject

Returns the value of attribute debug.



16
17
18
# File 'lib/maestro_common/mq/messenger.rb', line 16

def debug
  @debug
end

#from_nameObject

Returns the value of attribute from_name.



16
17
18
# File 'lib/maestro_common/mq/messenger.rb', line 16

def from_name
  @from_name
end

#queuesObject (readonly)

Returns the value of attribute queues.



17
18
19
# File 'lib/maestro_common/mq/messenger.rb', line 17

def queues
  @queues
end

#seqObject (readonly)

Returns the value of attribute seq.



17
18
19
# File 'lib/maestro_common/mq/messenger.rb', line 17

def seq
  @seq
end

Instance Method Details

#connected?Boolean



170
171
172
# File 'lib/maestro_common/mq/messenger.rb', line 170

def connected?
  return MQHelper.connection && MQHelper.connection.connected?
end

#deregister_queue(queue) ⇒ Object



181
182
183
184
185
186
187
188
# File 'lib/maestro_common/mq/messenger.rb', line 181

def deregister_queue(queue)
  queue = queues[queue] if queue.is_a?(String)

  if queue
    queues.delete(queue.name)
    queue.disconnect
  end
end

#disconnectObject



199
200
201
# File 'lib/maestro_common/mq/messenger.rb', line 199

def disconnect
  MQHelper.disconnect if connected?
end

#reconnectObject



195
196
197
# File 'lib/maestro_common/mq/messenger.rb', line 195

def reconnect
  MQHelper.reconnect unless connected?
end

#register_queue(queue) ⇒ Object



174
175
176
177
178
179
# File 'lib/maestro_common/mq/messenger.rb', line 174

def register_queue(queue)
  queues[queue.name] = queue
  queue.messenger = self

  return queue
end

#send_message_async(destination_queue, message, options = DEFAULT_SEND_OPTIONS, &block) ⇒ Object Also known as: send_message



203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
# File 'lib/maestro_common/mq/messenger.rb', line 203

def send_message_async(destination_queue, message, options = DEFAULT_SEND_OPTIONS, &block)
  my_seq = seq

  message['__msg_seq__'] = my_seq

  custom_text = message.has_key?('__msg_type__') ? message['__msg_type__'] : DEFAULT_CUSTOM_TEXT

  Maestro.log.debug "[seq #{my_seq}] Sending #{custom_text}" if debug

  Maestro::MQHelper.connection.send( destination_queue, MultiJson.dump(message), options ) do |r|
    Maestro.log.debug "[seq #{my_seq}] Sent #{custom_text} to '#{destination_queue}'" if debug

    yield(r) if block
  end
end

#send_message_sync(destination_queue, message, timeout = DEFAULT_SYNC_TIMEOUT, options = DEFAULT_SEND_OPTIONS, &block) ⇒ Object



220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
# File 'lib/maestro_common/mq/messenger.rb', line 220

def send_message_sync(destination_queue, message, timeout = DEFAULT_SYNC_TIMEOUT, options = DEFAULT_SEND_OPTIONS, &block)
  ackd = false
  rr = nil

  send_message_async(destination_queue, message, options) do |r|
    rr = r
    ackd = true
  end

  begin
    Timeout::timeout(timeout) do
      while !ackd
        sleep 0.25
      end

      ackd = true

      yield(rr) if block
    end
  rescue Timeout::Error
    raise MessageSendError, "Message ##{message['__msg_seq__']} not ack'd by MQ"
  end

  return ackd
end

#stopObject



190
191
192
193
# File 'lib/maestro_common/mq/messenger.rb', line 190

def stop
  queues.each { |q| deregister_queue(queue) }
  MQHelper.disconnect
end