Module: ActionSubscriber::MarchHare::Subscriber

Includes:
Logging
Included in:
RouteSet
Defined in:
lib/action_subscriber/march_hare/subscriber.rb

Instance Method Summary collapse

Methods included from Logging

initialize_logger, logger, #logger, logger=

Instance Method Details

#cancel_consumers!Object



6
7
8
9
10
11
# File 'lib/action_subscriber/march_hare/subscriber.rb', line 6

def cancel_consumers!
  march_hare_consumers.each(&:cancel)
  ::ActionSubscriber::ThreadPools.threadpools.each do |name, threadpool|
    threadpool.shutdown
  end
end

#march_hare_consumersObject



13
14
15
# File 'lib/action_subscriber/march_hare/subscriber.rb', line 13

def march_hare_consumers
  @march_hare_consumers ||= []
end

#setup_subscriptions!Object



17
18
19
20
21
22
23
24
25
# File 'lib/action_subscriber/march_hare/subscriber.rb', line 17

def setup_subscriptions!
  fail ::RuntimeError, "you cannot setup queues multiple times, this should only happen once at startup" unless subscriptions.empty?
  routes.each do |route|
    subscriptions << {
      :route => route,
      :queue => setup_queue(route),
    }
  end
end

#start_subscribers!Object



27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/action_subscriber/march_hare/subscriber.rb', line 27

def start_subscribers!
  subscriptions.each do |subscription|
    route = subscription[:route]
    queue = subscription[:queue]
    queue.channel.prefetch = route.prefetch if route.acknowledgements?
    threadpool = ::ActionSubscriber::ThreadPools.threadpools.fetch(route.threadpool_name)
    consumer = queue.subscribe(route.queue_subscription_options) do |, encoded_payload|
      ::ActiveSupport::Notifications.instrument "received_event.action_subscriber", :payload_size => encoded_payload.bytesize, :queue => queue.name
      properties = {
        :action => route.action,
        :channel => queue.channel,
        :content_type => .content_type,
        :delivery_tag => .delivery_tag,
        :exchange => .exchange,
        :headers => _normalized_headers(),
        :message_id => .message_id,
        :routing_key => .routing_key,
        :queue => queue.name,
        :uses_acknowledgements => route.acknowledgements?,
      }
      env = ::ActionSubscriber::Middleware::Env.new(route.subscriber, encoded_payload, properties)
      run_env(env, threadpool)
    end

    march_hare_consumers << consumer
  end
end