Class: RosettaQueue::Gateway::EventedExchange::FanoutExchange
Instance Method Summary
collapse
Methods included from Fanout
#fanout_name_for
#delete, #initialize
Instance Method Details
#publish(dest, msg, opts) ⇒ Object
93
94
95
96
97
98
99
|
# File 'lib/rosetta_queue/adapters/amqp_evented.rb', line 93
def publish(dest, msg, opts)
raise AdapterException, "Messages need to be published in an EventMachine run block (e.g., EM.run { RosettaQueue::Producer.publish(:foo, msg) } " unless EM.reactor_running?
exchange = channel.fanout(fanout_name_for(dest), opts)
exchange.publish(msg, opts)
RosettaQueue.logger.info("Publishing to fanout #{dest} :: #{msg}")
end
|
#receive(destination, message_handler) ⇒ Object
101
102
103
104
105
106
107
108
109
110
111
112
113
|
# File 'lib/rosetta_queue/adapters/amqp_evented.rb', line 101
def receive(destination, message_handler)
raise AdapterException, "Consumers need to run in an EventMachine 'run' block. Try wrapping them inside the evented consumer manager." unless EM.reactor_running?
queue = channel.queue("queue_#{self.object_id}")
exchange = channel.fanout(fanout_name_for(destination), @options)
ack = @options[:ack]
queue.bind(exchange).subscribe(@options) do |, msg|
RosettaQueue.logger.info("Receiving from #{destination} :: #{msg}")
message_handler.on_message(Filters.process_receiving(msg))
.ack if ack
end
end
|
#receive_once(destination, opts = {}) ⇒ Object
115
116
117
118
119
120
121
122
123
124
125
126
127
|
# File 'lib/rosetta_queue/adapters/amqp_evented.rb', line 115
def receive_once(destination, opts={})
raise AdapterException, "Consumers need to run in an EventMachine 'run' block. (e.g., EM.run { RosettaQueue::Consumer.receive }" unless EM.reactor_running?
queue = channel.queue("queue_#{self.object_id}")
exchange = channel.fanout(fanout_name_for(destination), opts)
ack = @options[:ack]
queue.bind(exchange).pop(opts) do |, msg|
RosettaQueue.logger.info("Receiving from #{destination} :: #{msg}")
.ack if ack
yield Filters.process_receiving(msg)
end
end
|