Class: MobME::Infrastructure::Queue::Backends::AMQP

Inherits:
MobME::Infrastructure::Queue::Backend show all
Defined in:
lib/mobme/infrastructure/queue/backends/amqp.rb

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ AMQP

Returns a new instance of AMQP.



5
6
7
8
9
10
# File 'lib/mobme/infrastructure/queue/backends/amqp.rb', line 5

def initialize(options = {})
  @bunny_options = options[:bunny_options] || {}
  @amqp_queues = {}
  
  configure
end

Instance Method Details

#add(queue, item, metadata = {}) ⇒ Object



12
13
14
15
16
17
18
19
# File 'lib/mobme/infrastructure/queue/backends/amqp.rb', line 12

def add(queue, item,  = {})
   = ()
  
  #register the queue if needed
  queue_for(queue)
  
  @amqp_exchange.publish(serialize_item(item, ), :key => queue)
end

#add_bulk(queue, items = []) ⇒ Object

Adds many items together



22
23
24
25
26
# File 'lib/mobme/infrastructure/queue/backends/amqp.rb', line 22

def add_bulk(queue, items = [])
  items.each do |item|
    add(queue, item, {})
  end
end

#empty(queue) ⇒ Object



45
46
47
# File 'lib/mobme/infrastructure/queue/backends/amqp.rb', line 45

def empty(queue)
  queue_for(queue).purge
end

#list(queue) ⇒ Object

Raises:

  • (NotImplementedError)


41
42
43
# File 'lib/mobme/infrastructure/queue/backends/amqp.rb', line 41

def list(queue)
  raise NotImplementedError, "AMQP doesn't support list!"
end

#list_queuesObject



49
50
51
# File 'lib/mobme/infrastructure/queue/backends/amqp.rb', line 49

def list_queues
  @amqp_queues.keys
end

#peek(queue) ⇒ Object

Raises:

  • (NotImplementedError)


33
34
35
# File 'lib/mobme/infrastructure/queue/backends/amqp.rb', line 33

def peek(queue)
  raise NotImplementedError, "AMQP doesn't support peek!"
end

#remove(queue) ⇒ Object



28
29
30
31
# File 'lib/mobme/infrastructure/queue/backends/amqp.rb', line 28

def remove(queue)
  item = queue_for(queue).pop[:payload]
  (:queue_empty == item) ? nil : unserialize_item(item)
end

#remove_queues(*queues) ⇒ Object Also known as: remove_queue



53
54
55
56
57
58
59
# File 'lib/mobme/infrastructure/queue/backends/amqp.rb', line 53

def remove_queues(*queues)
  queues = list_queues if queues.empty?
  queues.each do |queue|
    queue_for(queue).delete
    @amqp_queues.delete(queue)
  end
end

#size(queue) ⇒ Object



37
38
39
# File 'lib/mobme/infrastructure/queue/backends/amqp.rb', line 37

def size(queue)
  queue_for(queue).message_count
end