Module: ActionSubscriber::Bunny::Subscriber

Includes:
Logging
Included in:
RouteSet
Defined in:
lib/action_subscriber/bunny/subscriber.rb

Instance Method Summary collapse

Methods included from Logging

initialize_logger, logger, #logger, logger=

Instance Method Details

#bunny_consumersObject



6
7
8
# File 'lib/action_subscriber/bunny/subscriber.rb', line 6

def bunny_consumers
  @bunny_consumers ||= []
end

#cancel_consumers!Object



10
11
12
# File 'lib/action_subscriber/bunny/subscriber.rb', line 10

def cancel_consumers!
  bunny_consumers.each(&:cancel)
end


14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# File 'lib/action_subscriber/bunny/subscriber.rb', line 14

def print_subscriptions
  routes.group_by(&:subscriber).each do |subscriber, routes|
    logger.info subscriber.name
    routes.each do |route|
      logger.info "  -- method: #{route.action}"
      logger.info "    --  connection: #{route.connection_name}"
      logger.info "    -- concurrency: #{route.concurrency}"
      logger.info "    --    exchange: #{route.exchange}"
      logger.info "    --       queue: #{route.queue}"
      logger.info "    -- routing_key: #{route.routing_key}"
      logger.info "    --    prefetch: #{route.prefetch}"
      logger.error "WARNING having a prefetch lower than your concurrency will prevent your subscriber from fully utilizing its threadpool" if route.prefetch < route.concurrency
      if route.acknowledgements != subscriber.acknowledge_messages?
        logger.error "WARNING subscriber has acknowledgements as #{subscriber.acknowledge_messages?} and route has acknowledgements as #{route.acknowledgements}"
      end
    end
  end
end


33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/action_subscriber/bunny/subscriber.rb', line 33

def print_threadpool_stats
  logger.info "*DISCLAIMER* the number of running jobs is just a best guess. We don't have a good way to introspect the bunny threadpools so jobs that are sleeping or waiting on IO won't show up as running"
  subscriptions.group_by{|subscription| subscription[:route].subscriber}.each do |subscriber, subscriptions|
    logger.info subscriber.name
    subscriptions.each do |subscription|
      route = subscription[:route]
      work_pool = subscription[:queue].channel.work_pool
      running_threads = work_pool.threads.select{|thread| thread.status == "run"}.count
      routes.each do |route|
        logger.info "  -- method: #{route.action}"
        logger.info "    --  concurrency: #{route.concurrency}"
        logger.info "    -- running jobs: #{running_threads}"
        logger.info "    --      backlog: #{work_pool.backlog}"
      end
    end
  end
end

#setup_subscriptions!Object



51
52
53
54
55
56
57
58
59
# File 'lib/action_subscriber/bunny/subscriber.rb', line 51

def setup_subscriptions!
  fail ::RuntimeError, "you cannot setup queues multiple times, this should only happen once at startup" unless subscriptions.empty?
  routes.each do |route|
    subscriptions << {
      :route => route,
      :queue => setup_queue(route),
    }
  end
end

#start_subscribers!Object



61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/action_subscriber/bunny/subscriber.rb', line 61

def start_subscribers!
  subscriptions.each do |subscription|
    route = subscription[:route]
    queue = subscription[:queue]
    channel = queue.channel
    channel.prefetch(route.prefetch) if route.acknowledgements?
    consumer = ::Bunny::Consumer.new(channel, queue, channel.generate_consumer_tag, !route.acknowledgements?)
    consumer.on_delivery do |delivery_info, properties, encoded_payload|
      ::ActiveSupport::Notifications.instrument "received_event.action_subscriber", :payload_size => encoded_payload.bytesize, :queue => queue.name
      properties = {
        :action => route.action,
        :channel => queue.channel,
        :content_type => properties.content_type,
        :delivery_tag => delivery_info.delivery_tag,
        :exchange => delivery_info.exchange,
        :headers => properties.headers,
        :message_id => properties.message_id,
        :routing_key => delivery_info.routing_key,
        :queue => queue.name,
      }
      env = ::ActionSubscriber::Middleware::Env.new(route.subscriber, encoded_payload, properties)
      run_env(env)
    end
    bunny_consumers << consumer
    queue.subscribe_with(consumer)
  end
end

#wait_to_finish_with_timeout(timeout) ⇒ Object



89
90
91
92
93
94
95
96
# File 'lib/action_subscriber/bunny/subscriber.rb', line 89

def wait_to_finish_with_timeout(timeout)
  puts <<-MSG
    Currently bunny doesn't have any sort of a graceful shutdown or
    the ability to check on the status of its ConsumerWorkPool objects.
    For now we just wait for #{timeout}sec to let the worker pools drain.
  MSG
  sleep(timeout)
end