Class: Hutch::Worker

Inherits:
Object
  • Object
show all
Includes:
Logging
Defined in:
lib/hutch/worker.rb

Instance Method Summary collapse

Methods included from Logging

#logger, logger, logger=, setup_logger

Constructor Details

#initialize(broker, consumers) ⇒ Worker

Returns a new instance of Worker.



10
11
12
13
# File 'lib/hutch/worker.rb', line 10

def initialize(broker, consumers)
  @broker        = broker
  self.consumers = consumers
end

Instance Method Details

#bind_shutdown_handlerObject

Binds shutdown handler, called if channel is closed or network Failed



68
69
70
71
72
# File 'lib/hutch/worker.rb', line 68

def bind_shutdown_handler
  @broker.channel.on_shutdown do
    Thread.main[:shutdown_received] = true
  end
end

#consumers=(val) ⇒ Object



132
133
134
135
136
137
# File 'lib/hutch/worker.rb', line 132

def consumers=(val)
  if val.empty?
    logger.warn "no consumer loaded, ensure there's no configuration issue"
  end
  @consumers = val
end

#handle_error(message_id, payload, consumer, ex) ⇒ Object



126
127
128
129
130
# File 'lib/hutch/worker.rb', line 126

def handle_error(message_id, payload, consumer, ex)
  Hutch::Config[:error_handlers].each do |backend|
    backend.handle(message_id, payload, consumer, ex)
  end
end

#handle_message(consumer, delivery_info, properties, payload) ⇒ Object

Called internally when a new messages comes in from RabbitMQ. Responsible for wrapping up the message and passing it to the consumer.



104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
# File 'lib/hutch/worker.rb', line 104

def handle_message(consumer, delivery_info, properties, payload)
  logger.info("message(#{properties.message_id || '-'}): " +
              "routing key: #{delivery_info.routing_key}, " +
              "consumer: #{consumer}, " +
              "payload: #{payload}")

  broker = @broker
  begin
    message = Message.new(delivery_info, properties, payload)
    consumer_instance = consumer.new.tap { |c| c.broker, c.delivery_info = @broker, delivery_info }
    with_tracing(consumer_instance).handle(message)
    broker.ack(delivery_info.delivery_tag)
  rescue StandardError => ex
    broker.nack(delivery_info.delivery_tag)
    handle_error(properties.message_id, payload, consumer, ex)
  end
end

#handle_signalsObject

Handle any pending signals



54
55
56
57
58
59
60
# File 'lib/hutch/worker.rb', line 54

def handle_signals
  signal = Thread.main[:signal_queue].shift
  if signal
    logger.info "caught sig#{signal.downcase}, stopping hutch..."
    stop
  end
end

#main_loopObject



27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/hutch/worker.rb', line 27

def main_loop
  if defined?(JRUBY_VERSION)
    # Binds shutdown listener to notify main thread if channel was closed
    bind_shutdown_handler

    handle_signals until shutdown_not_called?(0.1)
  else
    # Take a break from Thread#join every 0.1 seconds to check if we've
    # been sent any signals
    handle_signals until @broker.wait_on_threads(0.1)
  end
end

#register_signal_handlersObject

Register handlers for SIGQUIT,TERM,INT to shut down the worker gracefully. Forceful shutdowns are very bad!



42
43
44
45
46
47
48
49
50
51
# File 'lib/hutch/worker.rb', line 42

def register_signal_handlers
  Thread.main[:signal_queue] = []
  %w(QUIT TERM INT).keep_if { |s| Signal.list.keys.include? s }.map(&:to_sym).each do |sig|
    # This needs to be reentrant, so we queue up signals to be handled
    # in the run loop, rather than acting on signals here
    trap(sig) do
      Thread.main[:signal_queue] << sig
    end
  end
end

#runObject

Run the main event loop. The consumers will be set up with queues, and process the messages in their respective queues indefinitely. This method never returns.



18
19
20
21
22
23
24
25
# File 'lib/hutch/worker.rb', line 18

def run
  setup_queues

  # Set up signal handlers for graceful shutdown
  register_signal_handlers

  main_loop
end

#setup_queue(consumer) ⇒ Object

Bind a consumer’s routing keys to its queue, and set up a subscription to receive messages sent to the queue.



92
93
94
95
96
97
98
99
100
# File 'lib/hutch/worker.rb', line 92

def setup_queue(consumer)
  queue = @broker.queue(consumer.get_queue_name, consumer.get_arguments)
  @broker.bind_queue(queue, consumer.routing_keys)

  queue.subscribe(manual_ack: true) do |*args|
    delivery_info, properties, payload = Hutch::Adapter.decode_message(*args)
    handle_message(consumer, delivery_info, properties, payload)
  end
end

#setup_queuesObject

Set up the queues for each of the worker’s consumers.



85
86
87
88
# File 'lib/hutch/worker.rb', line 85

def setup_queues
  logger.info 'setting up queues'
  @consumers.each { |consumer| setup_queue(consumer) }
end

#shutdown_not_called?(interval) ⇒ Boolean

Checks if shutdown handler was called, then sleeps for interval

Returns:

  • (Boolean)


75
76
77
78
79
80
81
82
# File 'lib/hutch/worker.rb', line 75

def shutdown_not_called?(interval)
  if Thread.main[:shutdown_received]
    true
  else
    sleep(interval)
    false
  end
end

#stopObject

Stop a running worker by killing all subscriber threads.



63
64
65
# File 'lib/hutch/worker.rb', line 63

def stop
  @broker.stop
end

#with_tracing(klass) ⇒ Object



122
123
124
# File 'lib/hutch/worker.rb', line 122

def with_tracing(klass)
  Hutch::Config[:tracer].new(klass)
end