Class: Qwrapper::Queues::RabbitMQ

Inherits:
Base
  • Object
show all
Defined in:
lib/qwrapper/queues/rabbitmq.rb

Instance Attribute Summary

Attributes inherited from Base

#config, #requeue_exceptions

Instance Method Summary collapse

Methods inherited from Base

#initialize

Methods included from Loggable

#logger, #logger=

Constructor Details

This class inherits a constructor from Qwrapper::Queues::Base

Instance Method Details

#publish(queue_name, messages, options = {}) ⇒ Object



36
37
38
39
40
41
42
43
# File 'lib/qwrapper/queues/rabbitmq.rb', line 36

def publish(queue_name, messages, options={})
  messages = [messages] unless messages.is_a?(Array)
  queue = get_queue(queue_name, options={})
  [messages].each do |message|
    queue.publish(message.to_s, :persistent => true)
  end
  connection.close if connection
end

#subscribe(queue_name, options = {}, &block) ⇒ Object



10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# File 'lib/qwrapper/queues/rabbitmq.rb', line 10

def subscribe(queue_name, options={}, &block)
  begin
    logger.info "Subscribed to '#{queue_name}'"
    queue = get_queue(queue_name, options)
    queue.subscribe(ack: true, block: true) do |delivery_info, , payload|
      # TODO find unique way to identify a message and list that as part
      # of the wrap message
      begin
        if logger.respond_to?(:wrap)
          logger_wrapped_block_execution(payload, &block)
        else
          block_execution(payload, &block)
        end
      rescue *requeue_exceptions => ex
        logger.error "Requeue logic"
        logger.error ex
      end
      queue.channel.ack(delivery_info.delivery_tag)
    end
    connection.close if connection
  rescue Exception => ex
    logger.error "TODO: What logic goes here?"
    logger.error ex
  end
end