Module: ActionSubscriber::MarchHare::Subscriber
Instance Method Summary collapse
- #cancel_consumers! ⇒ Object
- #march_hare_consumers ⇒ Object
- #print_subscriptions ⇒ Object
- #print_threadpool_stats ⇒ Object
- #setup_subscriptions! ⇒ Object
- #start_subscribers! ⇒ Object
- #wait_to_finish_with_timeout(timeout) ⇒ Object
Methods included from Logging
initialize_logger, logger, #logger, logger=
Instance Method Details
#cancel_consumers! ⇒ Object
6 7 8 |
# File 'lib/action_subscriber/march_hare/subscriber.rb', line 6 def cancel_consumers! march_hare_consumers.each(&:cancel) end |
#march_hare_consumers ⇒ Object
10 11 12 |
# File 'lib/action_subscriber/march_hare/subscriber.rb', line 10 def march_hare_consumers @march_hare_consumers ||= [] end |
#print_subscriptions ⇒ Object
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
# File 'lib/action_subscriber/march_hare/subscriber.rb', line 14 def print_subscriptions routes.group_by(&:subscriber).each do |subscriber, routes| logger.info subscriber.name routes.each do |route| executor = ::ActionSubscriber::RabbitConnection.connection_threadpools[route.connection_name] logger.info " -- method: #{route.action}" logger.info " -- connection: #{route.connection_name} (#{executor.get_maximum_pool_size} threads)" logger.info " -- concurrency: #{route.concurrency}" logger.info " -- exchange: #{route.exchange}" logger.info " -- queue: #{route.queue}" logger.info " -- routing_key: #{route.routing_key}" logger.info " -- prefetch: #{route.prefetch} per consumer (#{route.prefetch * route.concurrency} total)" if route.acknowledgements != subscriber. logger.error "WARNING subscriber has acknowledgements as #{subscriber.} and route has acknowledgements as #{route.acknowledgements}" end end end end |
#print_threadpool_stats ⇒ Object
33 34 35 36 37 38 39 40 |
# File 'lib/action_subscriber/march_hare/subscriber.rb', line 33 def print_threadpool_stats ::ActionSubscriber::RabbitConnection.connection_threadpools.each do |name, executor| logger.info "Connection #{name}" logger.info " -- available threads: #{executor.get_maximum_pool_size}" logger.info " -- running thread: #{executor.get_active_count}" logger.info " -- backlog: #{executor.get_queue.size}" end end |
#setup_subscriptions! ⇒ Object
42 43 44 45 46 47 48 49 50 51 52 |
# File 'lib/action_subscriber/march_hare/subscriber.rb', line 42 def setup_subscriptions! fail ::RuntimeError, "you cannot setup queues multiple times, this should only happen once at startup" unless subscriptions.empty? routes.each do |route| route.concurrency.times do subscriptions << { :route => route, :queue => setup_queue(route), } end end end |
#start_subscribers! ⇒ Object
54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 |
# File 'lib/action_subscriber/march_hare/subscriber.rb', line 54 def start_subscribers! subscriptions.each do |subscription| route = subscription[:route] queue = subscription[:queue] queue.channel.prefetch = route.prefetch if route.acknowledgements? consumer = queue.subscribe(route.) do |, encoded_payload| ::ActiveSupport::Notifications.instrument "received_event.action_subscriber", :payload_size => encoded_payload.bytesize, :queue => queue.name properties = { :action => route.action, :channel => queue.channel, :content_type => .content_type, :delivery_tag => .delivery_tag, :exchange => .exchange, :headers => _normalized_headers(), :message_id => ., :routing_key => .routing_key, :queue => queue.name, } env = ::ActionSubscriber::Middleware::Env.new(route.subscriber, encoded_payload, properties) run_env(env) end march_hare_consumers << consumer end end |
#wait_to_finish_with_timeout(timeout) ⇒ Object
80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 |
# File 'lib/action_subscriber/march_hare/subscriber.rb', line 80 def wait_to_finish_with_timeout(timeout) wait_loops = 0 loop do wait_loops = wait_loops + 1 any_threadpools_busy = false ::ActionSubscriber::RabbitConnection.connection_threadpools.each do |name, executor| next if executor.get_active_count <= 0 logger.info " -- Connection #{name} (active: #{executor.get_active_count}, queued: #{executor.get_queue.size})" any_threadpools_busy = true end if !any_threadpools_busy logger.info "Connection threadpools empty" break end break if wait_loops >= timeout Thread.pass sleep 1 end end |