Module: ActionSubscriber::MarchHare::Subscriber

Includes:
Logging
Included in:
RouteSet
Defined in:
lib/action_subscriber/march_hare/subscriber.rb

Instance Method Summary collapse

Methods included from Logging

initialize_logger, logger, #logger, logger=

Instance Method Details

#cancel_consumers!Object



6
7
8
# File 'lib/action_subscriber/march_hare/subscriber.rb', line 6

def cancel_consumers!
  march_hare_consumers.each(&:cancel)
end

#march_hare_consumersObject



10
11
12
# File 'lib/action_subscriber/march_hare/subscriber.rb', line 10

def march_hare_consumers
  @march_hare_consumers ||= []
end


14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# File 'lib/action_subscriber/march_hare/subscriber.rb', line 14

def print_subscriptions
  routes.group_by(&:subscriber).each do |subscriber, routes|
    logger.info subscriber.name
    routes.each do |route|
      executor = ::ActionSubscriber::RabbitConnection.connection_threadpools[route.connection_name]
      logger.info "  -- method: #{route.action}"
      logger.info "    --  connection: #{route.connection_name} (#{executor.get_maximum_pool_size} threads)"
      logger.info "    -- concurrency: #{route.concurrency}"
      logger.info "    --    exchange: #{route.exchange}"
      logger.info "    --       queue: #{route.queue}"
      logger.info "    -- routing_key: #{route.routing_key}"
      logger.info "    --    prefetch: #{route.prefetch} per consumer (#{route.prefetch * route.concurrency} total)"
      if route.acknowledgements != subscriber.acknowledge_messages?
        logger.error "WARNING subscriber has acknowledgements as #{subscriber.acknowledge_messages?} and route has acknowledgements as #{route.acknowledgements}"
      end
    end
  end
end


33
34
35
36
37
38
39
40
# File 'lib/action_subscriber/march_hare/subscriber.rb', line 33

def print_threadpool_stats
  ::ActionSubscriber::RabbitConnection.connection_threadpools.each do |name, executor|
    logger.info "Connection #{name}"
    logger.info "  -- available threads: #{executor.get_maximum_pool_size}"
    logger.info "  --    running thread: #{executor.get_active_count}"
    logger.info "  --           backlog: #{executor.get_queue.size}"
  end
end

#setup_subscriptions!Object



42
43
44
45
46
47
48
49
50
51
52
# File 'lib/action_subscriber/march_hare/subscriber.rb', line 42

def setup_subscriptions!
  fail ::RuntimeError, "you cannot setup queues multiple times, this should only happen once at startup" unless subscriptions.empty?
  routes.each do |route|
    route.concurrency.times do
      subscriptions << {
        :route => route,
        :queue => setup_queue(route),
      }
    end
  end
end

#start_subscribers!Object



54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/action_subscriber/march_hare/subscriber.rb', line 54

def start_subscribers!
  subscriptions.each do |subscription|
    route = subscription[:route]
    queue = subscription[:queue]
    queue.channel.prefetch = route.prefetch if route.acknowledgements?
    consumer = queue.subscribe(route.queue_subscription_options) do |, encoded_payload|
      ::ActiveSupport::Notifications.instrument "received_event.action_subscriber", :payload_size => encoded_payload.bytesize, :queue => queue.name
      properties = {
        :action => route.action,
        :channel => queue.channel,
        :content_type => .content_type,
        :delivery_tag => .delivery_tag,
        :exchange => .exchange,
        :headers => _normalized_headers(),
        :message_id => .message_id,
        :routing_key => .routing_key,
        :queue => queue.name,
      }
      env = ::ActionSubscriber::Middleware::Env.new(route.subscriber, encoded_payload, properties)
      run_env(env)
    end

    march_hare_consumers << consumer
  end
end

#wait_to_finish_with_timeout(timeout) ⇒ Object



80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
# File 'lib/action_subscriber/march_hare/subscriber.rb', line 80

def wait_to_finish_with_timeout(timeout)
  wait_loops = 0
  loop do
    wait_loops = wait_loops + 1
    any_threadpools_busy = false
    ::ActionSubscriber::RabbitConnection.connection_threadpools.each do |name, executor|
      next if executor.get_active_count <= 0
      logger.info "  -- Connection #{name} (active: #{executor.get_active_count}, queued: #{executor.get_queue.size})"
      any_threadpools_busy = true
    end
    if !any_threadpools_busy
      logger.info "Connection threadpools empty"
      break
    end
    break if wait_loops >= timeout
    Thread.pass
    sleep 1
  end
end