Module: Vx::Consumer::Subscribe
- Defined in:
- lib/vx/consumer/subscribe.rb
Instance Method Summary collapse
- #handle_delivery(channel, delivery_info, properties, payload) ⇒ Object
- #run_instance(delivery_info, properties, payload, channel) ⇒ Object
- #subscribe(options = {}) ⇒ Object
Instance Method Details
#handle_delivery(channel, delivery_info, properties, payload) ⇒ Object
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
# File 'lib/vx/consumer/subscribe.rb', line 23 def handle_delivery(channel, delivery_info, properties, payload) payload = decode_payload properties, payload instrumentation = { consumer: params.consumer_name, payload: payload, properties: properties, channel: channel.id } with_middlewares :sub, instrumentation do instrument("start_processing", instrumentation) instrument("process", instrumentation) do allocate_pub_channel do run_instance delivery_info, properties, payload, channel end end end end |
#run_instance(delivery_info, properties, payload, channel) ⇒ Object
43 44 45 46 47 48 49 |
# File 'lib/vx/consumer/subscribe.rb', line 43 def run_instance(delivery_info, properties, payload, channel) new.tap do |inst| inst.properties = properties inst.delivery_info = delivery_info inst._channel = channel end.perform payload end |
#subscribe(options = {}) ⇒ Object
5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
# File 'lib/vx/consumer/subscribe.rb', line 5 def subscribe( = {}) ch, q = bind() subscriber = Subscriber.new( ch, q, ch.generate_consumer_tag, !params.ack ) subscriber.vx_consumer_name = params.consumer_name subscriber.on_delivery do |delivery_info, properties, payload| handle_delivery ch, delivery_info, properties, payload end q.subscribe_with(subscriber) end |