Class: RosettaQueue::Gateway::EventedExchange::FanoutExchange

Inherits:
BaseExchange
  • Object
show all
Includes:
Fanout
Defined in:
lib/rosetta_queue/adapters/amqp_evented.rb

Instance Method Summary collapse

Methods included from Fanout

#fanout_name_for

Methods inherited from BaseExchange

#delete, #initialize, #unsubscribe

Constructor Details

This class inherits a constructor from RosettaQueue::Gateway::EventedExchange::BaseExchange

Instance Method Details

#publish(dest, msg, opts) ⇒ Object

Raises:



79
80
81
82
83
84
85
# File 'lib/rosetta_queue/adapters/amqp_evented.rb', line 79

def publish(dest, msg, opts)
  raise AdapterException, "Messages need to be published in an EventMachine run block (e.g., EM.run { RosettaQueue::Producer.publish(:foo, msg) } " unless EM.reactor_running?

  @queue = channel.fanout(fanout_name_for(dest), opts)
  @queue.publish(msg, opts)
  RosettaQueue.logger.info("Publishing to fanout #{dest} :: #{msg}")
end

#receive(destination, message_handler) ⇒ Object

Raises:



87
88
89
90
91
92
93
94
95
96
97
98
99
# File 'lib/rosetta_queue/adapters/amqp_evented.rb', line 87

def receive(destination, message_handler)
  raise AdapterException, "Consumers need to run in an EventMachine 'run' block.  Try wrapping them inside the evented consumer manager." unless EM.reactor_running?

  @queue = channel.queue("queue_#{self.object_id}")
  exchange = channel.fanout(fanout_name_for(destination), @options)
  ack = @options[:ack]

  @queue.bind(exchange).subscribe(@options) do |header, msg|
    RosettaQueue.logger.info("Receiving from #{destination} :: #{msg}")
    message_handler.handle_message(msg)
    header.ack if ack
  end
end

#receive_once(destination, opts = {}) ⇒ Object

Raises:



101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/rosetta_queue/adapters/amqp_evented.rb', line 101

def receive_once(destination, opts={})
  raise AdapterException, "Consumers need to run in an EventMachine 'run' block. (e.g., EM.run { RosettaQueue::Consumer.receive }" unless EM.reactor_running?

  @queue = channel.queue("queue_#{self.object_id}")
  exchange = channel.fanout(fanout_name_for(destination), opts)
  ack = @options[:ack]

  @queue.bind(exchange).pop(opts) do |header, msg|
    RosettaQueue.logger.info("Receiving from #{destination} :: #{msg}")
    header.ack if ack
    yield Filters.process_receiving(msg)
  end
end