Class: Hutch::Worker

Inherits:
Object
  • Object
show all
Includes:
Logging
Defined in:
lib/hutch/worker.rb

Instance Method Summary collapse

Methods included from Logging

#logger, logger, logger=, setup_logger

Constructor Details

#initialize(broker, consumers) ⇒ Worker

Returns a new instance of Worker.

Raises:



10
11
12
13
14
# File 'lib/hutch/worker.rb', line 10

def initialize(broker, consumers)
  @broker = broker
  raise WorkerSetupError.new('no consumers loaded') if consumers.empty?
  @consumers = consumers
end

Instance Method Details

#handle_error(message_id, consumer, ex) ⇒ Object



98
99
100
101
102
# File 'lib/hutch/worker.rb', line 98

def handle_error(message_id, consumer, ex)
  Hutch::Config[:error_handlers].each do |backend|
    backend.handle(message_id, consumer, ex)
  end
end

#handle_message(consumer, delivery_info, properties, payload) ⇒ Object

Called internally when a new messages comes in from RabbitMQ. Responsible for wrapping up the message and passing it to the consumer.



81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# File 'lib/hutch/worker.rb', line 81

def handle_message(consumer, delivery_info, properties, payload)
  logger.info("message(#{properties.message_id || '-'}): " +
              "routing key: #{delivery_info.routing_key}, " +
              "consumer: #{consumer}, " +
              "payload: #{payload}")

  broker = @broker
  begin
    message = Message.new(delivery_info, properties, payload)
    consumer.new.process(message)
    broker.ack(delivery_info.delivery_tag)
  rescue StandardError => ex
    handle_error(properties.message_id, consumer, ex)
    broker.ack(delivery_info.delivery_tag)
  end
end

#handle_signalsObject

Handle any pending signals



49
50
51
52
53
54
55
# File 'lib/hutch/worker.rb', line 49

def handle_signals
  signal = Thread.main[:signal_queue].shift
  if signal
    logger.info "caught sig#{signal.downcase}, stopping hutch..."
    stop
  end
end

#register_signal_handlersObject

Register handlers for SIGQUIT,TERM,INT to shut down the worker gracefully. Forceful shutdowns are very bad!



37
38
39
40
41
42
43
44
45
46
# File 'lib/hutch/worker.rb', line 37

def register_signal_handlers
  Thread.main[:signal_queue] = []
  %w(QUIT TERM INT).map(&:to_sym).each do |sig|
    # This needs to be reentrant, so we queue up signals to be handled
    # in the run loop, rather than acting on signals here
    trap(sig) do
      Thread.main[:signal_queue] << sig
    end
  end
end

#runObject

Run the main event loop. The consumers will be set up with queues, and process the messages in their respective queues indefinitely. This method never returns.



19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# File 'lib/hutch/worker.rb', line 19

def run
  setup_queues

  # Set up signal handlers for graceful shutdown
  register_signal_handlers

  # Take a break from Thread#join every 0.1 seconds to check if we've
  # been sent any signals
  handle_signals until @broker.wait_on_threads(0.1)
rescue Bunny::PreconditionFailed => ex
  logger.error ex.message
  raise WorkerSetupError.new('could not create queue due to a type ' +
                             'conflict with an existing queue, remove ' +
                             'the existing queue and try again')
end

#setup_queue(consumer) ⇒ Object

Bind a consumer’s routing keys to its queue, and set up a subscription to receive messages sent to the queue.



70
71
72
73
74
75
76
77
# File 'lib/hutch/worker.rb', line 70

def setup_queue(consumer)
  queue = @broker.queue(consumer.queue_name)
  @broker.bind_queue(queue, consumer.routing_keys)

  queue.subscribe(ack: true) do |delivery_info, properties, payload|
    handle_message(consumer, delivery_info, properties, payload)
  end
end

#setup_queuesObject

Set up the queues for each of the worker’s consumers.



63
64
65
66
# File 'lib/hutch/worker.rb', line 63

def setup_queues
  logger.info 'setting up queues'
  @consumers.each { |consumer| setup_queue(consumer) }
end

#stopObject

Stop a running worker by killing all subscriber threads.



58
59
60
# File 'lib/hutch/worker.rb', line 58

def stop
  @broker.stop
end