Module: ActionSubscriber::Bunny::Subscriber

Includes:
Logging
Included in:
RouteSet
Defined in:
lib/action_subscriber/bunny/subscriber.rb

Instance Method Summary collapse

Methods included from Logging

initialize_logger, logger, #logger, logger=

Instance Method Details

#bunny_consumersObject



6
7
8
# File 'lib/action_subscriber/bunny/subscriber.rb', line 6

def bunny_consumers
  @bunny_consumers ||= []
end

#cancel_consumers!Object



10
11
12
# File 'lib/action_subscriber/bunny/subscriber.rb', line 10

def cancel_consumers!
  bunny_consumers.each(&:cancel)
end


14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# File 'lib/action_subscriber/bunny/subscriber.rb', line 14

def print_subscriptions
  routes.group_by(&:subscriber).each do |subscriber, routes|
    logger.info subscriber.name
    routes.each do |route|
      logger.info "  -- method: #{route.action}"
      logger.info "    --  connection: #{route.connection_name}"
      logger.info "    -- concurrency: #{route.concurrency}"
      logger.info "    --    exchange: #{route.exchange}"
      logger.info "    --       queue: #{route.queue}"
      logger.info "    -- routing_key: #{route.routing_key}"
      logger.info "    --    prefetch: #{route.prefetch}"
      logger.error "WARNING having a prefetch lower than your concurrency will prevent your subscriber from fully utilizing its threadpool" if route.prefetch < route.concurrency
    end
  end
end


30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/action_subscriber/bunny/subscriber.rb', line 30

def print_threadpool_stats
  logger.info "*DISCLAIMER* the number of running jobs is just a best guess. We don't have a good way to introspect the bunny threadpools so jobs that are sleeping or waiting on IO won't show up as running"
  subscriptions.group_by{|subscription| subscription[:route].subscriber}.each do |subscriber, subscriptions|
    logger.info subscriber.name
    subscriptions.each do |subscription|
      route = subscription[:route]
      work_pool = subscription[:queue].channel.work_pool
      running_threads = work_pool.threads.select{|thread| thread.status == "run"}.count
      routes.each do |route|
        logger.info "  -- method: #{route.action}"
        logger.info "    --  concurrency: #{route.concurrency}"
        logger.info "    -- running jobs: #{running_threads}"
        logger.info "    --      backlog: #{work_pool.backlog}"
      end
    end
  end
end

#setup_subscriptions!Object



48
49
50
51
52
53
54
55
56
# File 'lib/action_subscriber/bunny/subscriber.rb', line 48

def setup_subscriptions!
  fail ::RuntimeError, "you cannot setup queues multiple times, this should only happen once at startup" unless subscriptions.empty?
  routes.each do |route|
    subscriptions << {
      :route => route,
      :queue => setup_queue(route),
    }
  end
end

#start_subscribers!Object



58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/action_subscriber/bunny/subscriber.rb', line 58

def start_subscribers!
  subscriptions.each do |subscription|
    route = subscription[:route]
    queue = subscription[:queue]
    channel = queue.channel
    channel.prefetch(route.prefetch) if route.acknowledgements?
    consumer = ::Bunny::Consumer.new(channel, queue, channel.generate_consumer_tag, !route.acknowledgements?)
    consumer.on_delivery do |delivery_info, properties, encoded_payload|
      ::ActiveSupport::Notifications.instrument "received_event.action_subscriber", :payload_size => encoded_payload.bytesize, :queue => queue.name
      properties = {
        :action => route.action,
        :channel => queue.channel,
        :content_type => properties.content_type,
        :delivery_tag => delivery_info.delivery_tag,
        :exchange => delivery_info.exchange,
        :headers => properties.headers,
        :message_id => properties.message_id,
        :routing_key => delivery_info.routing_key,
        :queue => queue.name,
      }
      env = ::ActionSubscriber::Middleware::Env.new(route.subscriber, encoded_payload, properties)
      run_env(env)
    end
    bunny_consumers << consumer
    queue.subscribe_with(consumer)
  end
end

#wait_to_finish_with_timeout(timeout) ⇒ Object



86
87
88
89
90
91
92
93
# File 'lib/action_subscriber/bunny/subscriber.rb', line 86

def wait_to_finish_with_timeout(timeout)
  puts "    Currently bunny doesn't have any sort of a graceful shutdown or\n    the ability to check on the status of its ConsumerWorkPool objects.\n    For now we just wait for \#{timeout}sec to let the worker pools drain.\n  MSG\n  sleep(timeout)\nend\n"