Class: Warren::Queue::AMQPAdapter

Inherits:
Warren::Queue show all
Defined in:
lib/warren/adapters/amqp_adapter.rb

Constant Summary

Constants inherited from Warren::Queue

InvalidAdapter, NoAdapterSet, NoBlockGiven, NoConnectionDetails

Class Method Summary collapse

Methods inherited from Warren::Queue

adapter, adapter=, connection, connection=, inherited, logger, logger=

Class Method Details

.check_connection_details(opts) ⇒ Object

Checks the connection details are correct for this adapter



9
10
11
12
13
14
15
# File 'lib/warren/adapters/amqp_adapter.rb', line 9

def self.check_connection_details opts
  # Check they've passed in the stuff without a default on it
  unless opts.has_key?(:user) && opts.has_key?(:pass) && opts.has_key?(:vhost)
    raise Warren::Connection::InvalidConnectionDetails, "Missing a username, password or vhost."
  end
  true
end

.publish(queue_name, payload, &blk) ⇒ Object

Sends a message to a queue. If successfully sent it returns true, unless callback block is passed (see below)

Warren::Queue.publish(:queue_name, {:foo => "name"})

Can also pass a block which is fired after the message is sent. If a block is passed, then the return value of the block is returned from this method.

Warren::Queue.publish(:queue_name, {:foo => "name"}) { puts "foo" }


40
41
42
43
44
45
46
47
48
49
50
# File 'lib/warren/adapters/amqp_adapter.rb', line 40

def self.publish queue_name, payload, &blk
  queue_name = self.queue_name if queue_name == :default
  # Create a message object if it isn't one already
  msg = Warren::MessageFilter.pack(payload)

  do_connect(true, blk) do
    queue = MQ::Queue.new(MQ.new, queue_name)
    queue.publish msg.to_s
  end

end

.queue_nameObject

Returns the default queue name or returns InvalidConnectionDetails if no default queue is defined



21
22
23
24
25
26
# File 'lib/warren/adapters/amqp_adapter.rb', line 21

def self.queue_name
  unless self.connection.options.has_key?(:default_queue)
    raise Warren::Connection::InvalidConnectionDetails, "Missing a default queue name."
  end
  self.connection.options[:default_queue]
end

.subscribe(queue_name, &block) ⇒ Object

Subscribes to a queue and runs the block for each message received

Warren::Queue.subscribe("example") {|msg| puts msg }

Expects a block and raises NoBlockGiven if no block is given.

Raises:



60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/warren/adapters/amqp_adapter.rb', line 60

def self.subscribe queue_name, &block
  raise NoBlockGiven unless block_given?
  queue_name = self.queue_name if queue_name == :default
  # todo: check if its a valid queue?
  do_connect(false) do
    queue = MQ::Queue.new(MQ.new, queue_name)
    queue.subscribe do |msg|
      msg = Warren::MessageFilter.unpack(msg)
      block.call(msg)
    end
  end
end