Class: EventMachine::Synchrony::AMQP::Consumer

Inherits:
AMQP::Consumer
  • Object
show all
Defined in:
lib/em-synchrony/amqp.rb

Instance Method Summary collapse

Instance Method Details

#acancelObject



109
# File 'lib/em-synchrony/amqp.rb', line 109

alias :acancel :cancel

#aconsumeObject



96
# File 'lib/em-synchrony/amqp.rb', line 96

alias :aconsume :consume

#aon_deliveryObject



87
# File 'lib/em-synchrony/amqp.rb', line 87

alias :aon_delivery :on_delivery

#aresubscribeObject



103
# File 'lib/em-synchrony/amqp.rb', line 103

alias :aresubscribe :resubscribe

#cancel(nowait = false) ⇒ Object



110
111
112
113
# File 'lib/em-synchrony/amqp.rb', line 110

def cancel(nowait = false)
  EM::Synchrony::AMQP.sync { |f| self.aconsume(nowait, &EM::Synchrony::AMQP.sync_cb(f)) }
  self
end

#consume(nowait = false) ⇒ Object

Raises:



97
98
99
100
101
# File 'lib/em-synchrony/amqp.rb', line 97

def consume(nowait = false)
  ret = EM::Synchrony::AMQP.sync { |f| self.aconsume(nowait, &EM::Synchrony::AMQP.sync_cb(f)) }
  raise Error.new(ret.to_s) unless ret.is_a?(::AMQ::Protocol::Basic::ConsumeOk)
  self
end

#on_delivery(&block) ⇒ Object



88
89
90
91
92
93
94
# File 'lib/em-synchrony/amqp.rb', line 88

def on_delivery(&block)
  Fiber.new do
    aon_delivery(&EM::Synchrony::AMQP.sync_cb(Fiber.current))
    loop { block.call(Fiber.yield) }
  end.resume
  self
end

#resubscribeObject



104
105
106
107
# File 'lib/em-synchrony/amqp.rb', line 104

def resubscribe
  EM::Synchrony::AMQP.sync { |f| self.aconsume(&EM::Synchrony::AMQP.sync_cb(f)) }
  self
end