Class: RosettaQueue::Gateway::Amqp
Instance Method Summary
collapse
Constructor Details
#initialize(adapter_settings = {}) ⇒ Amqp
Returns a new instance of Amqp.
14
15
16
17
|
# File 'lib/rosetta_queue/adapters/amqp.rb', line 14
def initialize(adapter_settings = {})
raise AdapterException, "Missing adapter settings" if adapter_settings.empty?
@adapter_settings = adapter_settings
end
|
Instance Method Details
#delete(destination, opts = {}) ⇒ Object
19
20
21
|
# File 'lib/rosetta_queue/adapters/amqp.rb', line 19
def delete(destination, opts={})
exchange_strategy_for(destination, opts).delete(destination)
end
|
#disconnect(message_handler) ⇒ Object
23
24
25
26
|
# File 'lib/rosetta_queue/adapters/amqp.rb', line 23
def disconnect(message_handler)
destination = destination_for(message_handler)
exchange_strategy_for(destination).unsubscribe
end
|
#receive_once(destination, opts = {}) ⇒ Object
28
29
30
31
32
|
# File 'lib/rosetta_queue/adapters/amqp.rb', line 28
def receive_once(destination, opts={})
exchange_strategy_for(destination, opts).receive_once(destination) do |msg|
return msg
end
end
|
#receive_with(message_handler) ⇒ Object
34
35
36
37
38
|
# File 'lib/rosetta_queue/adapters/amqp.rb', line 34
def receive_with(message_handler)
options = options_for(message_handler)
destination = destination_for(message_handler)
exchange_strategy_for(destination, options).receive(destination, message_handler)
end
|
#send_message(destination, message, options = nil) ⇒ Object
40
41
42
|
# File 'lib/rosetta_queue/adapters/amqp.rb', line 40
def send_message(destination, message, options=nil)
exchange_strategy_for(destination, options).publish(destination, message)
end
|
#unsubscribe ⇒ Object
44
|
# File 'lib/rosetta_queue/adapters/amqp.rb', line 44
def unsubscribe; end
|