Module: Vx::Consumer::Subscribe

Defined in:
lib/vx/consumer/subscribe.rb

Instance Method Summary collapse

Instance Method Details

#handle_delivery(channel, delivery_info, properties, payload) ⇒ Object



23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/vx/consumer/subscribe.rb', line 23

def handle_delivery(channel, delivery_info, properties, payload)
  payload = decode_payload properties, payload

  instrumentation = {
    consumer:   params.consumer_name,
    payload:    payload,
    properties: properties,
    channel:    channel.id
  }

  with_middlewares :sub, instrumentation do
    instrument("start_processing", instrumentation)
    instrument("process", instrumentation) do
      allocate_pub_channel do
        run_instance delivery_info, properties, payload, channel
      end
    end
  end
end

#run_instance(delivery_info, properties, payload, channel) ⇒ Object



43
44
45
46
47
48
49
# File 'lib/vx/consumer/subscribe.rb', line 43

def run_instance(delivery_info, properties, payload, channel)
  new.tap do |inst|
    inst.properties    = properties
    inst.delivery_info = delivery_info
    inst._channel      = channel
  end.perform payload
end

#subscribe(options = {}) ⇒ Object



5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# File 'lib/vx/consumer/subscribe.rb', line 5

def subscribe(options = {})
  ch, q = bind(options)

  subscriber = Subscriber.new(
    ch,
    q,
    ch.generate_consumer_tag,
    !params.ack
  )
  subscriber.vx_consumer_name = params.consumer_name

  subscriber.on_delivery do |delivery_info, properties, payload|
    handle_delivery ch, delivery_info, properties, payload
  end

  q.subscribe_with(subscriber)
end