Module: ActionSubscriber::Bunny::Subscriber
Instance Method Summary collapse
- #bunny_consumers ⇒ Object
- #cancel_consumers! ⇒ Object
- #print_subscriptions ⇒ Object
- #print_threadpool_stats ⇒ Object
- #setup_subscriptions! ⇒ Object
- #start_subscribers! ⇒ Object
- #wait_to_finish_with_timeout(timeout) ⇒ Object
Methods included from Logging
initialize_logger, logger, #logger, logger=
Instance Method Details
#bunny_consumers ⇒ Object
6 7 8 |
# File 'lib/action_subscriber/bunny/subscriber.rb', line 6 def bunny_consumers @bunny_consumers ||= [] end |
#cancel_consumers! ⇒ Object
10 11 12 |
# File 'lib/action_subscriber/bunny/subscriber.rb', line 10 def cancel_consumers! bunny_consumers.each(&:cancel) end |
#print_subscriptions ⇒ Object
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
# File 'lib/action_subscriber/bunny/subscriber.rb', line 14 def print_subscriptions routes.group_by(&:subscriber).each do |subscriber, routes| logger.info subscriber.name routes.each do |route| logger.info " -- method: #{route.action}" logger.info " -- connection: #{route.connection_name}" logger.info " -- concurrency: #{route.concurrency}" logger.info " -- exchange: #{route.exchange}" logger.info " -- queue: #{route.queue}" logger.info " -- routing_key: #{route.routing_key}" logger.info " -- prefetch: #{route.prefetch}" logger.error "WARNING having a prefetch lower than your concurrency will prevent your subscriber from fully utilizing its threadpool" if route.prefetch < route.concurrency if route.acknowledgements != subscriber. logger.error "WARNING subscriber has acknowledgements as #{subscriber.} and route has acknowledgements as #{route.acknowledgements}" end end end end |
#print_threadpool_stats ⇒ Object
33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/action_subscriber/bunny/subscriber.rb', line 33 def print_threadpool_stats logger.info "*DISCLAIMER* the number of running jobs is just a best guess. We don't have a good way to introspect the bunny threadpools so jobs that are sleeping or waiting on IO won't show up as running" subscriptions.group_by{|subscription| subscription[:route].subscriber}.each do |subscriber, subscriptions| logger.info subscriber.name subscriptions.each do |subscription| route = subscription[:route] work_pool = subscription[:queue].channel.work_pool running_threads = work_pool.threads.select{|thread| thread.status == "run"}.count routes.each do |route| logger.info " -- method: #{route.action}" logger.info " -- concurrency: #{route.concurrency}" logger.info " -- running jobs: #{running_threads}" logger.info " -- backlog: #{work_pool.backlog}" end end end end |
#setup_subscriptions! ⇒ Object
51 52 53 54 55 56 57 58 59 |
# File 'lib/action_subscriber/bunny/subscriber.rb', line 51 def setup_subscriptions! fail ::RuntimeError, "you cannot setup queues multiple times, this should only happen once at startup" unless subscriptions.empty? routes.each do |route| subscriptions << { :route => route, :queue => setup_queue(route), } end end |
#start_subscribers! ⇒ Object
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 |
# File 'lib/action_subscriber/bunny/subscriber.rb', line 61 def start_subscribers! subscriptions.each do |subscription| route = subscription[:route] queue = subscription[:queue] channel = queue.channel channel.prefetch(route.prefetch) if route.acknowledgements? consumer = ::Bunny::Consumer.new(channel, queue, channel.generate_consumer_tag, !route.acknowledgements?) consumer.on_delivery do |delivery_info, properties, encoded_payload| ::ActiveSupport::Notifications.instrument "received_event.action_subscriber", :payload_size => encoded_payload.bytesize, :queue => queue.name properties = { :action => route.action, :channel => queue.channel, :content_type => properties.content_type, :delivery_tag => delivery_info.delivery_tag, :exchange => delivery_info.exchange, :headers => properties.headers, :message_id => properties., :routing_key => delivery_info.routing_key, :queue => queue.name, } env = ::ActionSubscriber::Middleware::Env.new(route.subscriber, encoded_payload, properties) run_env(env) end bunny_consumers << consumer queue.subscribe_with(consumer) end end |
#wait_to_finish_with_timeout(timeout) ⇒ Object
89 90 91 92 93 94 95 96 |
# File 'lib/action_subscriber/bunny/subscriber.rb', line 89 def wait_to_finish_with_timeout(timeout) puts <<-MSG Currently bunny doesn't have any sort of a graceful shutdown or the ability to check on the status of its ConsumerWorkPool objects. For now we just wait for #{timeout}sec to let the worker pools drain. MSG sleep(timeout) end |