Method: Messaging::ConsumerSupervisor#stop

Defined in:
lib/messaging/consumer_supervisor.rb

#stopObject



30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/messaging/consumer_supervisor.rb', line 30

def stop
  return if @signal_to_stop

  instrument('consumer_supervisor.stop') do
    @signal_to_stop = true
    consumers.map { |consumer| Thread.new { consumer.stop } }.join
    @threads.select(&:alive?).each { |thread| thread&.wakeup }
    @thread_pool&.shutdown
    @thread_pool&.wait_for_termination(60)
    Messaging.logger.info 'Consumers stopped'
  end
end