Class: Hutch::Worker
Instance Method Summary collapse
- #acknowledge_error(delivery_info, properties, broker, ex) ⇒ Object
-
#bind_shutdown_handler ⇒ Object
Binds shutdown handler, called if channel is closed or network Failed.
- #consumers=(val) ⇒ Object
- #error_acknowledgements ⇒ Object
- #handle_error(message_id, payload, consumer, ex) ⇒ Object
-
#handle_message(consumer, delivery_info, properties, payload) ⇒ Object
Called internally when a new messages comes in from RabbitMQ.
-
#handle_signals ⇒ Object
Handle any pending signals.
-
#initialize(broker, consumers) ⇒ Worker
constructor
A new instance of Worker.
- #main_loop ⇒ Object
-
#register_signal_handlers ⇒ Object
Register handlers for SIGQUIT,TERM,INT to shut down the worker gracefully.
-
#run ⇒ Object
Run the main event loop.
-
#setup_queue(consumer) ⇒ Object
Bind a consumer’s routing keys to its queue, and set up a subscription to receive messages sent to the queue.
-
#setup_queues ⇒ Object
Set up the queues for each of the worker’s consumers.
-
#shutdown_not_called?(interval) ⇒ Boolean
Checks if shutdown handler was called, then sleeps for interval.
-
#stop ⇒ Object
Stop a running worker by killing all subscriber threads.
- #with_tracing(klass) ⇒ Object
Methods included from Logging
#logger, logger, logger=, setup_logger
Constructor Details
#initialize(broker, consumers) ⇒ Worker
Returns a new instance of Worker.
11 12 13 14 |
# File 'lib/hutch/worker.rb', line 11 def initialize(broker, consumers) @broker = broker self.consumers = consumers end |
Instance Method Details
#acknowledge_error(delivery_info, properties, broker, ex) ⇒ Object
137 138 139 140 141 142 143 |
# File 'lib/hutch/worker.rb', line 137 def acknowledge_error(delivery_info, properties, broker, ex) acks = error_acknowledgements + [Hutch::Acknowledgements::NackOnAllFailures.new] acks.find do |backend| backend.handle(delivery_info, properties, broker, ex) end end |
#bind_shutdown_handler ⇒ Object
Binds shutdown handler, called if channel is closed or network Failed
69 70 71 72 73 |
# File 'lib/hutch/worker.rb', line 69 def bind_shutdown_handler @broker.channel.on_shutdown do Thread.main[:shutdown_received] = true end end |
#consumers=(val) ⇒ Object
145 146 147 148 149 150 |
# File 'lib/hutch/worker.rb', line 145 def consumers=(val) if val.empty? logger.warn "no consumer loaded, ensure there's no configuration issue" end @consumers = val end |
#error_acknowledgements ⇒ Object
152 153 154 |
# File 'lib/hutch/worker.rb', line 152 def error_acknowledgements Hutch::Config[:error_acknowledgements] end |
#handle_error(message_id, payload, consumer, ex) ⇒ Object
131 132 133 134 135 |
# File 'lib/hutch/worker.rb', line 131 def handle_error(, payload, consumer, ex) Hutch::Config[:error_handlers].each do |backend| backend.handle(, payload, consumer, ex) end end |
#handle_message(consumer, delivery_info, properties, payload) ⇒ Object
Called internally when a new messages comes in from RabbitMQ. Responsible for wrapping up the message and passing it to the consumer.
105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 |
# File 'lib/hutch/worker.rb', line 105 def (consumer, delivery_info, properties, payload) broker = @broker begin serializer = consumer.get_serializer || Hutch::Config[:serializer] logger.info { spec = serializer.binary? ? "#{payload.bytesize} bytes" : "#{payload}" "message(#{properties. || '-'}): " + "routing key: #{delivery_info.routing_key}, " + "consumer: #{consumer}, " + "payload: #{spec}" } = Message.new(delivery_info, properties, payload, serializer) consumer_instance = consumer.new.tap { |c| c.broker, c.delivery_info = @broker, delivery_info } with_tracing(consumer_instance).handle() broker.ack(delivery_info.delivery_tag) rescue StandardError => ex acknowledge_error(delivery_info, properties, broker, ex) handle_error(properties., payload, consumer, ex) end end |
#handle_signals ⇒ Object
Handle any pending signals
55 56 57 58 59 60 61 |
# File 'lib/hutch/worker.rb', line 55 def handle_signals signal = Thread.main[:signal_queue].shift if signal logger.info "caught sig#{signal.downcase}, stopping hutch..." stop end end |
#main_loop ⇒ Object
28 29 30 31 32 33 34 35 36 37 38 39 |
# File 'lib/hutch/worker.rb', line 28 def main_loop if defined?(JRUBY_VERSION) # Binds shutdown listener to notify main thread if channel was closed bind_shutdown_handler handle_signals until shutdown_not_called?(0.1) else # Take a break from Thread#join every 0.1 seconds to check if we've # been sent any signals handle_signals until @broker.wait_on_threads(0.1) end end |
#register_signal_handlers ⇒ Object
Register handlers for SIGQUIT,TERM,INT to shut down the worker gracefully. Forceful shutdowns are very bad!
43 44 45 46 47 48 49 50 51 52 |
# File 'lib/hutch/worker.rb', line 43 def register_signal_handlers Thread.main[:signal_queue] = [] %w(QUIT TERM INT).keep_if { |s| Signal.list.keys.include? s }.map(&:to_sym).each do |sig| # This needs to be reentrant, so we queue up signals to be handled # in the run loop, rather than acting on signals here trap(sig) do Thread.main[:signal_queue] << sig end end end |
#run ⇒ Object
Run the main event loop. The consumers will be set up with queues, and process the messages in their respective queues indefinitely. This method never returns.
19 20 21 22 23 24 25 26 |
# File 'lib/hutch/worker.rb', line 19 def run setup_queues # Set up signal handlers for graceful shutdown register_signal_handlers main_loop end |
#setup_queue(consumer) ⇒ Object
Bind a consumer’s routing keys to its queue, and set up a subscription to receive messages sent to the queue.
93 94 95 96 97 98 99 100 101 |
# File 'lib/hutch/worker.rb', line 93 def setup_queue(consumer) queue = @broker.queue(consumer.get_queue_name, consumer.get_arguments) @broker.bind_queue(queue, consumer.routing_keys) queue.subscribe(manual_ack: true) do |*args| delivery_info, properties, payload = Hutch::Adapter.(*args) (consumer, delivery_info, properties, payload) end end |
#setup_queues ⇒ Object
Set up the queues for each of the worker’s consumers.
86 87 88 89 |
# File 'lib/hutch/worker.rb', line 86 def setup_queues logger.info 'setting up queues' @consumers.each { |consumer| setup_queue(consumer) } end |
#shutdown_not_called?(interval) ⇒ Boolean
Checks if shutdown handler was called, then sleeps for interval
76 77 78 79 80 81 82 83 |
# File 'lib/hutch/worker.rb', line 76 def shutdown_not_called?(interval) if Thread.main[:shutdown_received] true else sleep(interval) false end end |
#stop ⇒ Object
Stop a running worker by killing all subscriber threads.
64 65 66 |
# File 'lib/hutch/worker.rb', line 64 def stop @broker.stop end |