Module: ActionSubscriber::Bunny::Subscriber
Instance Method Summary collapse
- #auto_pop! ⇒ Object
- #auto_subscribe! ⇒ Object
- #bunny_consumers ⇒ Object
- #cancel_consumers! ⇒ Object
- #create_queue(channel, queue_name, queue_options) ⇒ Object
Methods included from Logging
initialize_logger, logger, #logger, logger=
Instance Method Details
#auto_pop! ⇒ Object
18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
# File 'lib/action_subscriber/bunny/subscriber.rb', line 18 def auto_pop! # Because threadpools can be large we want to cap the number # of times we will pop each time we poll the broker times_to_pop = [::ActionSubscriber::Threadpool.ready_size, ::ActionSubscriber.config.times_to_pop].min times_to_pop.times do subscriptions.each do |subscription| route = subscription[:route] queue = subscription[:queue] # Handle busy checks on a per threadpool basis next if route.threadpool.busy? delivery_info, properties, encoded_payload = queue.pop(route.) next unless encoded_payload # empty queue ::ActiveSupport::Notifications.instrument "popped_event.action_subscriber", :payload_size => encoded_payload.bytesize, :queue => queue.name properties = { :action => route.action, :content_type => properties[:content_type], :delivery_tag => delivery_info.delivery_tag, :exchange => delivery_info.exchange, :headers => properties.headers, :message_id => nil, :routing_key => delivery_info.routing_key, :queue => queue.name, } env = ::ActionSubscriber::Middleware::Env.new(route.subscriber, encoded_payload, properties) enqueue_env(route.threadpool, env) end end end |
#auto_subscribe! ⇒ Object
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 |
# File 'lib/action_subscriber/bunny/subscriber.rb', line 48 def auto_subscribe! subscriptions.each do |subscription| route = subscription[:route] queue = subscription[:queue] channel = queue.channel channel.prefetch(route.prefetch) if route.acknowledgements? consumer = ::Bunny::Consumer.new(channel, queue, channel.generate_consumer_tag, !route.acknowledgements?) consumer.on_delivery do |delivery_info, properties, encoded_payload| ::ActiveSupport::Notifications.instrument "received_event.action_subscriber", :payload_size => encoded_payload.bytesize, :queue => queue.name properties = { :action => route.action, :channel => queue.channel, :content_type => properties.content_type, :delivery_tag => delivery_info.delivery_tag, :exchange => delivery_info.exchange, :headers => properties.headers, :message_id => properties., :routing_key => delivery_info.routing_key, :queue => queue.name, } env = ::ActionSubscriber::Middleware::Env.new(route.subscriber, encoded_payload, properties) run_env(env) end bunny_consumers << consumer queue.subscribe_with(consumer) end end |
#bunny_consumers ⇒ Object
6 7 8 |
# File 'lib/action_subscriber/bunny/subscriber.rb', line 6 def bunny_consumers @bunny_consumers ||= [] end |
#cancel_consumers! ⇒ Object
10 11 12 |
# File 'lib/action_subscriber/bunny/subscriber.rb', line 10 def cancel_consumers! bunny_consumers.each(&:cancel) end |
#create_queue(channel, queue_name, queue_options) ⇒ Object
14 15 16 |
# File 'lib/action_subscriber/bunny/subscriber.rb', line 14 def create_queue(channel, queue_name, ) ::Bunny::Queue.new(channel, queue_name, ) end |