Class: RosettaQueue::Gateway::Amqp

Inherits:
BaseAdapter show all
Defined in:
lib/rosetta_queue/adapters/amqp.rb

Direct Known Subclasses

AmqpEventedAdapter, AmqpSynchAdapter

Instance Method Summary collapse

Constructor Details

#initialize(adapter_settings = {}) ⇒ Amqp

Returns a new instance of Amqp.

Raises:



14
15
16
17
# File 'lib/rosetta_queue/adapters/amqp.rb', line 14

def initialize(adapter_settings = {})
  raise AdapterException, "Missing adapter settings" if adapter_settings.empty?
  @adapter_settings = adapter_settings
end

Instance Method Details

#delete(destination, opts = {}) ⇒ Object



19
20
21
# File 'lib/rosetta_queue/adapters/amqp.rb', line 19

def delete(destination, opts={})
  exchange_strategy_for(destination, opts).delete(destination)
end

#disconnect(message_handler) ⇒ Object



23
24
25
26
# File 'lib/rosetta_queue/adapters/amqp.rb', line 23

def disconnect(message_handler)
  destination = destination_for(message_handler)
  exchange_strategy_for(destination).unsubscribe
end

#receive_once(destination, opts = {}) ⇒ Object



28
29
30
31
32
# File 'lib/rosetta_queue/adapters/amqp.rb', line 28

def receive_once(destination, opts={})
  exchange_strategy_for(destination, opts).receive_once(destination) do |msg|
    return msg
  end
end

#receive_with(message_handler) ⇒ Object



34
35
36
37
38
# File 'lib/rosetta_queue/adapters/amqp.rb', line 34

def receive_with(message_handler)
  options = options_for(message_handler)
  destination = destination_for(message_handler)
  exchange_strategy_for(destination, options).receive(destination, message_handler)
end

#send_message(destination, message, options = nil) ⇒ Object



40
41
42
# File 'lib/rosetta_queue/adapters/amqp.rb', line 40

def send_message(destination, message, options=nil)
  exchange_strategy_for(destination, options).publish(destination, message)
end

#unsubscribeObject



44
# File 'lib/rosetta_queue/adapters/amqp.rb', line 44

def unsubscribe; end