Module: Vx::Common::AMQP::Consumer::Subscribe
- Included in:
- ClassMethods
- Defined in:
- lib/vx/common/amqp/consumer/subscribe.rb
Instance Method Summary collapse
Instance Method Details
#pop(q) ⇒ Object
39 40 41 42 43 44 45 46 47 48 |
# File 'lib/vx/common/amqp/consumer/subscribe.rb', line 39 def pop(q) unpacked = nil delivery_info, properties, payload = q.pop(ack: ack) if payload unpacked = properties, payload end [unpacked, delivery_info, properties] end |
#start ⇒ Object
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
# File 'lib/vx/common/amqp/consumer/subscribe.rb', line 14 def start rs = nil instrumentation = { consumer_id: consumer_id, consumer: consumer_name } session.open instrumentation session.with_channel do x = declare_exchange q = declare_queue instrumentation.merge!( exchange: x.name, queue: q.name, ) instrument("start.consumer.amqp", instrumentation) q.bind(x, ) rs = yield(x, q) if block_given? instrument("shutdown.consumer.amqp", instrumentation) end rs end |
#subscribe ⇒ Object
8 9 10 11 12 |
# File 'lib/vx/common/amqp/consumer/subscribe.rb', line 8 def subscribe start do |x, q| subscription_loop q end end |