Class: Hutch::Worker
Instance Method Summary collapse
- #handle_error(message_id, consumer, ex) ⇒ Object
-
#handle_message(consumer, delivery_info, properties, payload) ⇒ Object
Called internally when a new messages comes in from RabbitMQ.
-
#handle_signals ⇒ Object
Handle any pending signals.
-
#initialize(broker, consumers) ⇒ Worker
constructor
A new instance of Worker.
-
#register_signal_handlers ⇒ Object
Register handlers for SIGQUIT,TERM,INT to shut down the worker gracefully.
-
#run ⇒ Object
Run the main event loop.
-
#setup_queue(consumer) ⇒ Object
Bind a consumer’s routing keys to its queue, and set up a subscription to receive messages sent to the queue.
-
#setup_queues ⇒ Object
Set up the queues for each of the worker’s consumers.
-
#stop ⇒ Object
Stop a running worker by killing all subscriber threads.
Methods included from Logging
#logger, logger, logger=, setup_logger
Constructor Details
#initialize(broker, consumers) ⇒ Worker
Returns a new instance of Worker.
10 11 12 13 14 |
# File 'lib/hutch/worker.rb', line 10 def initialize(broker, consumers) @broker = broker raise WorkerSetupError.new('no consumers loaded') if consumers.empty? @consumers = consumers end |
Instance Method Details
#handle_error(message_id, consumer, ex) ⇒ Object
98 99 100 101 102 |
# File 'lib/hutch/worker.rb', line 98 def handle_error(, consumer, ex) Hutch::Config[:error_handlers].each do |backend| backend.handle(, consumer, ex) end end |
#handle_message(consumer, delivery_info, properties, payload) ⇒ Object
Called internally when a new messages comes in from RabbitMQ. Responsible for wrapping up the message and passing it to the consumer.
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 |
# File 'lib/hutch/worker.rb', line 81 def (consumer, delivery_info, properties, payload) logger.info("message(#{properties. || '-'}): " + "routing key: #{delivery_info.routing_key}, " + "consumer: #{consumer}, " + "payload: #{payload}") broker = @broker begin = Message.new(delivery_info, properties, payload) consumer.new.process() broker.ack(delivery_info.delivery_tag) rescue StandardError => ex handle_error(properties., consumer, ex) broker.ack(delivery_info.delivery_tag) end end |
#handle_signals ⇒ Object
Handle any pending signals
49 50 51 52 53 54 55 |
# File 'lib/hutch/worker.rb', line 49 def handle_signals signal = Thread.main[:signal_queue].shift if signal logger.info "caught sig#{signal.downcase}, stopping hutch..." stop end end |
#register_signal_handlers ⇒ Object
Register handlers for SIGQUIT,TERM,INT to shut down the worker gracefully. Forceful shutdowns are very bad!
37 38 39 40 41 42 43 44 45 46 |
# File 'lib/hutch/worker.rb', line 37 def register_signal_handlers Thread.main[:signal_queue] = [] %w(QUIT TERM INT).map(&:to_sym).each do |sig| # This needs to be reentrant, so we queue up signals to be handled # in the run loop, rather than acting on signals here trap(sig) do Thread.main[:signal_queue] << sig end end end |
#run ⇒ Object
Run the main event loop. The consumers will be set up with queues, and process the messages in their respective queues indefinitely. This method never returns.
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
# File 'lib/hutch/worker.rb', line 19 def run setup_queues # Set up signal handlers for graceful shutdown register_signal_handlers # Take a break from Thread#join every 0.1 seconds to check if we've # been sent any signals handle_signals until @broker.wait_on_threads(0.1) rescue Bunny::PreconditionFailed => ex logger.error ex. raise WorkerSetupError.new('could not create queue due to a type ' + 'conflict with an existing queue, remove ' + 'the existing queue and try again') end |
#setup_queue(consumer) ⇒ Object
Bind a consumer’s routing keys to its queue, and set up a subscription to receive messages sent to the queue.
70 71 72 73 74 75 76 77 |
# File 'lib/hutch/worker.rb', line 70 def setup_queue(consumer) queue = @broker.queue(consumer.queue_name) @broker.bind_queue(queue, consumer.routing_keys) queue.subscribe(ack: true) do |delivery_info, properties, payload| (consumer, delivery_info, properties, payload) end end |
#setup_queues ⇒ Object
Set up the queues for each of the worker’s consumers.
63 64 65 66 |
# File 'lib/hutch/worker.rb', line 63 def setup_queues logger.info 'setting up queues' @consumers.each { |consumer| setup_queue(consumer) } end |
#stop ⇒ Object
Stop a running worker by killing all subscriber threads.
58 59 60 |
# File 'lib/hutch/worker.rb', line 58 def stop @broker.stop end |