Class: EventMachine::Synchrony::AMQP::Exchange

Inherits:
AMQP::Exchange
  • Object
show all
Defined in:
lib/em-synchrony/amqp.rb

Instance Method Summary collapse

Constructor Details

#initialize(channel, type, name, opts = {}, &block) ⇒ Exchange

Returns a new instance of Exchange.



117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
# File 'lib/em-synchrony/amqp.rb', line 117

def initialize(channel, type, name, opts = {}, &block)
  f = Fiber.current

  # AMQP Exchange constructor handles certain special exchanges differently.
  # The callback passed in isn't called when the response comes back
  # but is called immediately on the original calling fiber. That means that
  # when the sync_cb callback yields the fiber when called, it will hang and never
  # be resumed.  So handle these exchanges without yielding
  if name == "amq.#{type}" or name.empty? or opts[:no_declare]
    exchange = nil
    super(channel, type, name, opts) { |ex| exchange = ex }
  else
    super(channel, type, name, opts, &EM::Synchrony::AMQP.sync_cb(f))
    exchange, declare_ok = Fiber.yield
    raise Error.new(declare_ok.to_s) unless declare_ok.is_a?(::AMQ::Protocol::Exchange::DeclareOk)
  end

  exchange
end

Instance Method Details

#adeleteObject



142
# File 'lib/em-synchrony/amqp.rb', line 142

alias :adelete :delete

#apublishObject



137
# File 'lib/em-synchrony/amqp.rb', line 137

alias :apublish :publish

#delete(opts = {}) ⇒ Object



143
144
145
# File 'lib/em-synchrony/amqp.rb', line 143

def delete(opts = {})
  EM::Synchrony::AMQP.sync { |f| adelete(opts, &EM::Synchrony::AMQP.sync_cb(f)) }
end

#publish(payload, options = {}) ⇒ Object



138
139
140
# File 'lib/em-synchrony/amqp.rb', line 138

def publish payload, options = {}
  apublish(payload, options)
end