Module: ActionSubscriber::Bunny::Subscriber

Includes:
Logging
Included in:
RouteSet
Defined in:
lib/action_subscriber/bunny/subscriber.rb

Instance Method Summary collapse

Methods included from Logging

initialize_logger, logger, #logger, logger=

Instance Method Details

#auto_pop!Object



18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/action_subscriber/bunny/subscriber.rb', line 18

def auto_pop!
  # Because threadpools can be large we want to cap the number
  # of times we will pop each time we poll the broker
  times_to_pop = [::ActionSubscriber::Threadpool.ready_size, ::ActionSubscriber.config.times_to_pop].min
  times_to_pop.times do
    subscriptions.each do |subscription|
      route = subscription[:route]
      queue = subscription[:queue]
      # Handle busy checks on a per threadpool basis
      next if route.threadpool.busy?

      delivery_info, properties, encoded_payload = queue.pop(route.queue_subscription_options)
      next unless encoded_payload # empty queue
      ::ActiveSupport::Notifications.instrument "popped_event.action_subscriber", :payload_size => encoded_payload.bytesize, :queue => queue.name
      properties = {
        :action => route.action,
        :content_type => properties[:content_type],
        :delivery_tag => delivery_info.delivery_tag,
        :exchange => delivery_info.exchange,
        :headers => properties.headers,
        :message_id => nil,
        :routing_key => delivery_info.routing_key,
        :queue => queue.name,
      }
      env = ::ActionSubscriber::Middleware::Env.new(route.subscriber, encoded_payload, properties)
      enqueue_env(route.threadpool, env)
    end
  end
end

#auto_subscribe!Object



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/action_subscriber/bunny/subscriber.rb', line 48

def auto_subscribe!
  subscriptions.each do |subscription|
    route = subscription[:route]
    queue = subscription[:queue]
    channel = queue.channel
    channel.prefetch(route.prefetch) if route.acknowledgements?
    consumer = ::Bunny::Consumer.new(channel, queue, channel.generate_consumer_tag, !route.acknowledgements?)
    consumer.on_delivery do |delivery_info, properties, encoded_payload|
      ::ActiveSupport::Notifications.instrument "received_event.action_subscriber", :payload_size => encoded_payload.bytesize, :queue => queue.name
      properties = {
        :action => route.action,
        :channel => queue.channel,
        :content_type => properties.content_type,
        :delivery_tag => delivery_info.delivery_tag,
        :exchange => delivery_info.exchange,
        :headers => properties.headers,
        :message_id => properties.message_id,
        :routing_key => delivery_info.routing_key,
        :queue => queue.name,
      }
      env = ::ActionSubscriber::Middleware::Env.new(route.subscriber, encoded_payload, properties)
      run_env(env)
    end
    bunny_consumers << consumer
    queue.subscribe_with(consumer)
  end
end

#bunny_consumersObject



6
7
8
# File 'lib/action_subscriber/bunny/subscriber.rb', line 6

def bunny_consumers
  @bunny_consumers ||= []
end

#cancel_consumers!Object



10
11
12
# File 'lib/action_subscriber/bunny/subscriber.rb', line 10

def cancel_consumers!
  bunny_consumers.each(&:cancel)
end

#create_queue(channel, queue_name, queue_options) ⇒ Object



14
15
16
# File 'lib/action_subscriber/bunny/subscriber.rb', line 14

def create_queue(channel, queue_name, queue_options)
  ::Bunny::Queue.new(channel, queue_name, queue_options)
end