Class: Hutch::Worker

Inherits:
Object
  • Object
show all
Includes:
Logging
Defined in:
lib/hutch/worker.rb

Instance Method Summary collapse

Methods included from Logging

#logger, logger, logger=, setup_logger

Constructor Details

#initialize(broker, consumers) ⇒ Worker

Returns a new instance of Worker.



11
12
13
14
# File 'lib/hutch/worker.rb', line 11

def initialize(broker, consumers)
  @broker        = broker
  self.consumers = consumers
end

Instance Method Details

#acknowledge_error(delivery_info, properties, broker, ex) ⇒ Object



137
138
139
140
141
142
143
# File 'lib/hutch/worker.rb', line 137

def acknowledge_error(delivery_info, properties, broker, ex)
  acks = error_acknowledgements +
    [Hutch::Acknowledgements::NackOnAllFailures.new]
  acks.find do |backend|
    backend.handle(delivery_info, properties, broker, ex)
  end
end

#bind_shutdown_handlerObject

Binds shutdown handler, called if channel is closed or network Failed



69
70
71
72
73
# File 'lib/hutch/worker.rb', line 69

def bind_shutdown_handler
  @broker.channel.on_shutdown do
    Thread.main[:shutdown_received] = true
  end
end

#consumers=(val) ⇒ Object



145
146
147
148
149
150
# File 'lib/hutch/worker.rb', line 145

def consumers=(val)
  if val.empty?
    logger.warn "no consumer loaded, ensure there's no configuration issue"
  end
  @consumers = val
end

#error_acknowledgementsObject



152
153
154
# File 'lib/hutch/worker.rb', line 152

def error_acknowledgements
  Hutch::Config[:error_acknowledgements]
end

#handle_error(message_id, payload, consumer, ex) ⇒ Object



131
132
133
134
135
# File 'lib/hutch/worker.rb', line 131

def handle_error(message_id, payload, consumer, ex)
  Hutch::Config[:error_handlers].each do |backend|
    backend.handle(message_id, payload, consumer, ex)
  end
end

#handle_message(consumer, delivery_info, properties, payload) ⇒ Object

Called internally when a new messages comes in from RabbitMQ. Responsible for wrapping up the message and passing it to the consumer.



105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
# File 'lib/hutch/worker.rb', line 105

def handle_message(consumer, delivery_info, properties, payload)
  broker = @broker
  begin
    serializer = consumer.get_serializer || Hutch::Config[:serializer]
    logger.info {
      spec   = serializer.binary? ? "#{payload.bytesize} bytes" : "#{payload}"
      "message(#{properties.message_id || '-'}): " +
      "routing key: #{delivery_info.routing_key}, " +
      "consumer: #{consumer}, " +
      "payload: #{spec}"
    }

    message = Message.new(delivery_info, properties, payload, serializer)
    consumer_instance = consumer.new.tap { |c| c.broker, c.delivery_info = @broker, delivery_info }
    with_tracing(consumer_instance).handle(message)
    broker.ack(delivery_info.delivery_tag)
  rescue StandardError => ex
    acknowledge_error(delivery_info, properties, broker, ex)
    handle_error(properties.message_id, payload, consumer, ex)
  end
end

#handle_signalsObject

Handle any pending signals



55
56
57
58
59
60
61
# File 'lib/hutch/worker.rb', line 55

def handle_signals
  signal = Thread.main[:signal_queue].shift
  if signal
    logger.info "caught sig#{signal.downcase}, stopping hutch..."
    stop
  end
end

#main_loopObject



28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/hutch/worker.rb', line 28

def main_loop
  if defined?(JRUBY_VERSION)
    # Binds shutdown listener to notify main thread if channel was closed
    bind_shutdown_handler

    handle_signals until shutdown_not_called?(0.1)
  else
    # Take a break from Thread#join every 0.1 seconds to check if we've
    # been sent any signals
    handle_signals until @broker.wait_on_threads(0.1)
  end
end

#register_signal_handlersObject

Register handlers for SIGQUIT,TERM,INT to shut down the worker gracefully. Forceful shutdowns are very bad!



43
44
45
46
47
48
49
50
51
52
# File 'lib/hutch/worker.rb', line 43

def register_signal_handlers
  Thread.main[:signal_queue] = []
  %w(QUIT TERM INT).keep_if { |s| Signal.list.keys.include? s }.map(&:to_sym).each do |sig|
    # This needs to be reentrant, so we queue up signals to be handled
    # in the run loop, rather than acting on signals here
    trap(sig) do
      Thread.main[:signal_queue] << sig
    end
  end
end

#runObject

Run the main event loop. The consumers will be set up with queues, and process the messages in their respective queues indefinitely. This method never returns.



19
20
21
22
23
24
25
26
# File 'lib/hutch/worker.rb', line 19

def run
  setup_queues

  # Set up signal handlers for graceful shutdown
  register_signal_handlers

  main_loop
end

#setup_queue(consumer) ⇒ Object

Bind a consumer’s routing keys to its queue, and set up a subscription to receive messages sent to the queue.



93
94
95
96
97
98
99
100
101
# File 'lib/hutch/worker.rb', line 93

def setup_queue(consumer)
  queue = @broker.queue(consumer.get_queue_name, consumer.get_arguments)
  @broker.bind_queue(queue, consumer.routing_keys)

  queue.subscribe(manual_ack: true) do |*args|
    delivery_info, properties, payload = Hutch::Adapter.decode_message(*args)
    handle_message(consumer, delivery_info, properties, payload)
  end
end

#setup_queuesObject

Set up the queues for each of the worker’s consumers.



86
87
88
89
# File 'lib/hutch/worker.rb', line 86

def setup_queues
  logger.info 'setting up queues'
  @consumers.each { |consumer| setup_queue(consumer) }
end

#shutdown_not_called?(interval) ⇒ Boolean

Checks if shutdown handler was called, then sleeps for interval

Returns:

  • (Boolean)


76
77
78
79
80
81
82
83
# File 'lib/hutch/worker.rb', line 76

def shutdown_not_called?(interval)
  if Thread.main[:shutdown_received]
    true
  else
    sleep(interval)
    false
  end
end

#stopObject

Stop a running worker by killing all subscriber threads.



64
65
66
# File 'lib/hutch/worker.rb', line 64

def stop
  @broker.stop
end

#with_tracing(klass) ⇒ Object



127
128
129
# File 'lib/hutch/worker.rb', line 127

def with_tracing(klass)
  Hutch::Config[:tracer].new(klass)
end