Class: RosettaQueue::Gateway::SynchExchange::FanoutExchange
Instance Method Summary
collapse
Methods included from Fanout
#fanout_name_for
#delete, #initialize, #unsubscribe
Instance Method Details
#publish(destination, message, options = {}) ⇒ Object
90
91
92
93
94
|
# File 'lib/rosetta_queue/adapters/amqp_synch.rb', line 90
def publish(destination, message, options={})
exchange = conn.exchange(fanout_name_for(destination), options.merge({:type => :fanout}))
exchange.publish(message, options)
RosettaQueue.logger.info("Publishing to fanout #{destination} :: #{message}")
end
|
#receive(destination, message_handler) ⇒ Object
96
97
98
99
100
101
102
103
104
105
106
|
# File 'lib/rosetta_queue/adapters/amqp_synch.rb', line 96
def receive(destination, message_handler)
ack = @options[:ack]
@queue = conn.queue("queue_#{self.object_id}", @options)
exchange = conn.exchange(fanout_name_for(destination), @options.merge({:type => :fanout}))
@queue.bind(exchange)
@queue.subscribe(@options) do |msg|
RosettaQueue.logger.info("Receiving from #{destination} :: #{msg}")
message_handler.on_message(Filters.process_receiving(msg))
@queue.ack if ack
end
end
|
#receive_once(destination, options = {}) {|Filters.process_receiving(msg)| ... } ⇒ Object
108
109
110
111
112
113
114
115
116
117
|
# File 'lib/rosetta_queue/adapters/amqp_synch.rb', line 108
def receive_once(destination, options={})
ack = options[:ack]
@queue = conn.queue("queue_#{self.object_id}", options)
exchange = conn.exchange(fanout_name_for(destination), options.merge({:type => :fanout}))
@queue.bind(exchange)
msg = @queue.pop(options)
RosettaQueue.logger.info("Receiving from #{destination} :: #{msg}")
@queue.ack if ack
yield Filters.process_receiving(msg)
end
|