Class: Hutch::Worker

Inherits:
Object
  • Object
show all
Includes:
Logging
Defined in:
lib/hutch/worker.rb

Instance Method Summary collapse

Methods included from Logging

#logger, logger, logger=, setup_logger

Constructor Details

#initialize(broker, consumers) ⇒ Worker

Returns a new instance of Worker.



10
11
12
13
# File 'lib/hutch/worker.rb', line 10

def initialize(broker, consumers)
  @broker        = broker
  self.consumers = consumers
end

Instance Method Details

#consumers=(val) ⇒ Object



103
104
105
106
107
108
# File 'lib/hutch/worker.rb', line 103

def consumers=(val)
  if val.empty?
    logger.warn "no consumer loaded, ensure there's no configuration issue"
  end
  @consumers = val
end

#handle_error(message_id, payload, consumer, ex) ⇒ Object



97
98
99
100
101
# File 'lib/hutch/worker.rb', line 97

def handle_error(message_id, payload, consumer, ex)
  Hutch::Config[:error_handlers].each do |backend|
    backend.handle(message_id, payload, consumer, ex)
  end
end

#handle_message(consumer, delivery_info, properties, payload) ⇒ Object

Called internally when a new messages comes in from RabbitMQ. Responsible for wrapping up the message and passing it to the consumer.



75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# File 'lib/hutch/worker.rb', line 75

def handle_message(consumer, delivery_info, properties, payload)
  logger.info("message(#{properties.message_id || '-'}): " +
              "routing key: #{delivery_info.routing_key}, " +
              "consumer: #{consumer}, " +
              "payload: #{payload}")

  broker = @broker
  begin
    message = Message.new(delivery_info, properties, payload)
    consumer_instance = consumer.new.tap { |c| c.broker, c.delivery_info = @broker, delivery_info }
    with_tracing(consumer_instance).handle(message)
    broker.ack(delivery_info.delivery_tag)
  rescue StandardError => ex
    broker.nack(delivery_info.delivery_tag)
    handle_error(properties.message_id, payload, consumer, ex)
  end
end

#handle_signalsObject

Handle any pending signals



43
44
45
46
47
48
49
# File 'lib/hutch/worker.rb', line 43

def handle_signals
  signal = Thread.main[:signal_queue].shift
  if signal
    logger.info "caught sig#{signal.downcase}, stopping hutch..."
    stop
  end
end

#register_signal_handlersObject

Register handlers for SIGQUIT,TERM,INT to shut down the worker gracefully. Forceful shutdowns are very bad!



31
32
33
34
35
36
37
38
39
40
# File 'lib/hutch/worker.rb', line 31

def register_signal_handlers
  Thread.main[:signal_queue] = []
  %w(QUIT TERM INT).keep_if { |s| Signal.list.keys.include? s }.map(&:to_sym).each do |sig|
    # This needs to be reentrant, so we queue up signals to be handled
    # in the run loop, rather than acting on signals here
    trap(sig) do
      Thread.main[:signal_queue] << sig
    end
  end
end

#runObject

Run the main event loop. The consumers will be set up with queues, and process the messages in their respective queues indefinitely. This method never returns.



18
19
20
21
22
23
24
25
26
27
# File 'lib/hutch/worker.rb', line 18

def run
  setup_queues

  # Set up signal handlers for graceful shutdown
  register_signal_handlers

  # Take a break from Thread#join every 0.1 seconds to check if we've
  # been sent any signals
  handle_signals until @broker.wait_on_threads(0.1)
end

#setup_queue(consumer) ⇒ Object

Bind a consumer’s routing keys to its queue, and set up a subscription to receive messages sent to the queue.



64
65
66
67
68
69
70
71
# File 'lib/hutch/worker.rb', line 64

def setup_queue(consumer)
  queue = @broker.queue(consumer.get_queue_name, consumer.get_arguments)
  @broker.bind_queue(queue, consumer.routing_keys)

  queue.subscribe(manual_ack: true) do |delivery_info, properties, payload|
    handle_message(consumer, delivery_info, properties, payload)
  end
end

#setup_queuesObject

Set up the queues for each of the worker’s consumers.



57
58
59
60
# File 'lib/hutch/worker.rb', line 57

def setup_queues
  logger.info 'setting up queues'
  @consumers.each { |consumer| setup_queue(consumer) }
end

#stopObject

Stop a running worker by killing all subscriber threads.



52
53
54
# File 'lib/hutch/worker.rb', line 52

def stop
  @broker.stop
end

#with_tracing(klass) ⇒ Object



93
94
95
# File 'lib/hutch/worker.rb', line 93

def with_tracing(klass)
  Hutch::Config[:tracer].new(klass)
end