Class: RosettaQueue::Gateway::EventedExchange::DirectExchange
- Inherits:
-
BaseExchange
- Object
- BaseExchange
- RosettaQueue::Gateway::EventedExchange::DirectExchange
show all
- Defined in:
- lib/rosetta_queue/adapters/amqp_evented.rb
Instance Method Summary
collapse
#delete, #initialize
Instance Method Details
#publish(destination, message, options = {}) ⇒ Object
55
56
57
58
59
60
61
62
|
# File 'lib/rosetta_queue/adapters/amqp_evented.rb', line 55
def publish(destination, message, options={})
raise AdapterException, "Messages need to be published in an EventMachine run block (e.g., EM.run { RosettaQueue::Producer.publish(:foo, msg) } " unless EM.reactor_running?
queue = channel.queue(destination, options)
queue.publish(message, options)
RosettaQueue.logger.info("Publishing to #{destination} :: #{message}")
queue.unsubscribe
end
|
#receive(destination, message_handler) ⇒ Object
64
65
66
67
68
69
70
71
72
73
74
|
# File 'lib/rosetta_queue/adapters/amqp_evented.rb', line 64
def receive(destination, message_handler)
raise AdapterException, "Consumers need to run in an EventMachine 'run' block. Try wrapping them inside the evented consumer manager." unless EM.reactor_running?
queue = channel.queue(destination, @options)
ack = @options[:ack]
queue.subscribe(@options) do |, msg|
RosettaQueue.logger.info("Receiving from #{destination} :: #{msg}")
message_handler.on_message(Filters.process_receiving(msg))
.ack if ack
end
end
|
#receive_once(destination, options = {}) ⇒ Object
76
77
78
79
80
81
82
83
84
85
86
|
# File 'lib/rosetta_queue/adapters/amqp_evented.rb', line 76
def receive_once(destination, options={})
raise AdapterException, "Consumers need to run in an EventMachine 'run' block. (e.g., EM.run { RosettaQueue::Consumer.receive }" unless EM.reactor_running?
queue = channel.queue(destination, @options)
ack = @options[:ack]
queue.pop(@options) do |, msg|
RosettaQueue.logger.info("Receiving from #{destination} :: #{msg}")
.ack if ack
yield Filters.process_receiving(msg)
end
end
|