Class: RosettaQueue::Gateway::SynchExchange::DirectExchange
Instance Method Summary
collapse
#delete, #initialize, #unsubscribe
Instance Method Details
#publish(destination, message, options = {}) ⇒ Object
59
60
61
62
63
64
|
# File 'lib/rosetta_queue/adapters/amqp_synch.rb', line 59
def publish(destination, message, options={})
RosettaQueue.logger.info("Publishing to #{destination} :: #{message}")
queue = conn.queue(destination, options)
queue.publish(message, options)
conn.stop
end
|
#receive(destination, message_handler) ⇒ Object
66
67
68
69
70
71
72
73
74
|
# File 'lib/rosetta_queue/adapters/amqp_synch.rb', line 66
def receive(destination, message_handler)
ack = @options[:ack]
@queue = conn.queue(destination, @options)
@queue.subscribe(@options) do |msg|
RosettaQueue.logger.info("Receiving from #{destination} :: #{msg}")
message_handler.on_message(Filters.process_receiving(msg))
@queue.ack if ack
end
end
|
#receive_once(destination, options = {}) {|Filters.process_receiving(msg)| ... } ⇒ Object
76
77
78
79
80
81
82
83
|
# File 'lib/rosetta_queue/adapters/amqp_synch.rb', line 76
def receive_once(destination, options = {})
ack = options[:ack]
@queue = conn.queue(destination, options)
msg = @queue.pop(options)
RosettaQueue.logger.info("Receiving from #{destination} :: #{msg}")
@queue.ack if ack
yield Filters.process_receiving(msg)
end
|