Class: Hutch::Worker

Inherits:
Object
  • Object
show all
Includes:
Logging
Defined in:
lib/hutch/worker.rb

Instance Method Summary collapse

Methods included from Logging

#logger, logger, logger=, setup_logger

Constructor Details

#initialize(broker, consumers, setup_procs) ⇒ Worker

Returns a new instance of Worker.



13
14
15
16
17
# File 'lib/hutch/worker.rb', line 13

def initialize(broker, consumers, setup_procs)
  @broker        = broker
  self.consumers = consumers
  self.setup_procs = setup_procs
end

Instance Method Details

#acknowledge_error(delivery_info, properties, broker, ex) ⇒ Object



90
91
92
93
94
95
96
# File 'lib/hutch/worker.rb', line 90

def acknowledge_error(delivery_info, properties, broker, ex)
  acks = error_acknowledgements +
    [Hutch::Acknowledgements::NackOnAllFailures.new]
  acks.find do |backend|
    backend.handle(delivery_info, properties, broker, ex)
  end
end

#consumers=(val) ⇒ Object



98
99
100
101
102
103
# File 'lib/hutch/worker.rb', line 98

def consumers=(val)
  if val.empty?
    logger.warn "no consumer loaded, ensure there's no configuration issue"
  end
  @consumers = val
end

#error_acknowledgementsObject



105
106
107
# File 'lib/hutch/worker.rb', line 105

def error_acknowledgements
  Hutch::Config[:error_acknowledgements]
end

#handle_error(*args) ⇒ Object



84
85
86
87
88
# File 'lib/hutch/worker.rb', line 84

def handle_error(*args)
  Hutch::Config[:error_handlers].each do |backend|
    backend.handle(*args)
  end
end

#handle_message(consumer, delivery_info, properties, payload) ⇒ Object

Called internally when a new messages comes in from RabbitMQ. Responsible for wrapping up the message and passing it to the consumer.



61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/hutch/worker.rb', line 61

def handle_message(consumer, delivery_info, properties, payload)
  serializer = consumer.get_serializer || Hutch::Config[:serializer]
  logger.debug {
    spec   = serializer.binary? ? "#{payload.bytesize} bytes" : "#{payload}"
    "message(#{properties.message_id || '-'}): " +
    "routing key: #{delivery_info.routing_key}, " +
    "consumer: #{consumer}, " +
    "payload: #{spec}"
  }

  message = Message.new(delivery_info, properties, payload, serializer)
  consumer_instance = consumer.new.tap { |c| c.broker, c.delivery_info = @broker, delivery_info }
  with_tracing(consumer_instance).handle(message)
  @broker.ack(delivery_info.delivery_tag)
rescue => ex
  acknowledge_error(delivery_info, properties, @broker, ex)
  handle_error(properties, payload, consumer, ex)
end

#runObject

Run the main event loop. The consumers will be set up with queues, and process the messages in their respective queues indefinitely. This method never returns.



22
23
24
25
26
27
28
29
# File 'lib/hutch/worker.rb', line 22

def run
  setup_queues
  setup_procs.each(&:call)

  Waiter.wait_until_signaled

  stop
end

#setup_queue(consumer) ⇒ Object

Bind a consumer’s routing keys to its queue, and set up a subscription to receive messages sent to the queue.



47
48
49
50
51
52
53
54
55
56
57
# File 'lib/hutch/worker.rb', line 47

def setup_queue(consumer)
  logger.info "setting up queue: #{consumer.get_queue_name}"

  queue = @broker.queue(consumer.get_queue_name, consumer.get_arguments)
  @broker.bind_queue(queue, consumer.routing_keys)

  queue.subscribe(consumer_tag: unique_consumer_tag, manual_ack: true) do |*args|
    delivery_info, properties, payload = Hutch::Adapter.decode_message(*args)
    handle_message(consumer, delivery_info, properties, payload)
  end
end

#setup_queuesObject

Set up the queues for each of the worker’s consumers.



37
38
39
40
41
42
43
# File 'lib/hutch/worker.rb', line 37

def setup_queues
  logger.info 'setting up queues'
  vetted = @consumers.reject { |c| group_configured? && group_restricted?(c) }
  vetted.each do |c|
    setup_queue(c)
  end
end

#stopObject

Stop a running worker by killing all subscriber threads.



32
33
34
# File 'lib/hutch/worker.rb', line 32

def stop
  @broker.stop
end

#with_tracing(klass) ⇒ Object



80
81
82
# File 'lib/hutch/worker.rb', line 80

def with_tracing(klass)
  Hutch::Config[:tracer].new(klass)
end