Class: EventMachine::Synchrony::AMQP::Exchange
- Inherits:
-
AMQP::Exchange
- Object
- AMQP::Exchange
- EventMachine::Synchrony::AMQP::Exchange
- Defined in:
- lib/em-synchrony/amqp.rb
Instance Method Summary collapse
- #adelete ⇒ Object
- #apublish ⇒ Object
- #delete(opts = {}) ⇒ Object
-
#initialize(channel, type, name, opts = {}, &block) ⇒ Exchange
constructor
A new instance of Exchange.
- #publish(payload, options = {}) ⇒ Object
Constructor Details
#initialize(channel, type, name, opts = {}, &block) ⇒ Exchange
Returns a new instance of Exchange.
117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 |
# File 'lib/em-synchrony/amqp.rb', line 117 def initialize(channel, type, name, opts = {}, &block) f = Fiber.current # AMQP Exchange constructor handles certain special exchanges differently. # The callback passed in isn't called when the response comes back # but is called immediately on the original calling fiber. That means that # when the sync_cb callback yields the fiber when called, it will hang and never # be resumed. So handle these exchanges without yielding if name == "amq.#{type}" or name.empty? or opts[:no_declare] exchange = nil super(channel, type, name, opts) { |ex| exchange = ex } else super(channel, type, name, opts, &EM::Synchrony::AMQP.sync_cb(f)) exchange, declare_ok = Fiber.yield raise Error.new(declare_ok.to_s) unless declare_ok.is_a?(::AMQ::Protocol::Exchange::DeclareOk) end exchange end |
Instance Method Details
#adelete ⇒ Object
142 |
# File 'lib/em-synchrony/amqp.rb', line 142 alias :adelete :delete |
#apublish ⇒ Object
137 |
# File 'lib/em-synchrony/amqp.rb', line 137 alias :apublish :publish |
#delete(opts = {}) ⇒ Object
143 144 145 |
# File 'lib/em-synchrony/amqp.rb', line 143 def delete(opts = {}) EM::Synchrony::AMQP.sync { |f| adelete(opts, &EM::Synchrony::AMQP.sync_cb(f)) } end |
#publish(payload, options = {}) ⇒ Object
138 139 140 |
# File 'lib/em-synchrony/amqp.rb', line 138 def publish payload, = {} apublish(payload, ) end |