Class: Hutch::Worker

Inherits:
Object
  • Object
show all
Includes:
Logging
Defined in:
lib/hutch/worker.rb

Instance Method Summary collapse

Methods included from Logging

#logger, logger, logger=, setup_logger

Constructor Details

#initialize(broker, consumers, setup_procs) ⇒ Worker

Returns a new instance of Worker.



13
14
15
16
17
# File 'lib/hutch/worker.rb', line 13

def initialize(broker, consumers, setup_procs)
  @broker        = broker
  self.consumers = consumers
  self.setup_procs = setup_procs
end

Instance Method Details

#acknowledge_error(delivery_info, properties, broker, ex) ⇒ Object



85
86
87
88
89
90
91
# File 'lib/hutch/worker.rb', line 85

def acknowledge_error(delivery_info, properties, broker, ex)
  acks = error_acknowledgements +
    [Hutch::Acknowledgements::NackOnAllFailures.new]
  acks.find do |backend|
    backend.handle(delivery_info, properties, broker, ex)
  end
end

#consumers=(val) ⇒ Object



93
94
95
96
97
98
# File 'lib/hutch/worker.rb', line 93

def consumers=(val)
  if val.empty?
    logger.warn "no consumer loaded, ensure there's no configuration issue"
  end
  @consumers = val
end

#error_acknowledgementsObject



100
101
102
# File 'lib/hutch/worker.rb', line 100

def error_acknowledgements
  Hutch::Config[:error_acknowledgements]
end

#handle_error(*args) ⇒ Object



79
80
81
82
83
# File 'lib/hutch/worker.rb', line 79

def handle_error(*args)
  Hutch::Config[:error_handlers].each do |backend|
    backend.handle(*args)
  end
end

#handle_message(consumer, delivery_info, properties, payload) ⇒ Object

Called internally when a new messages comes in from RabbitMQ. Responsible for wrapping up the message and passing it to the consumer.



56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/hutch/worker.rb', line 56

def handle_message(consumer, delivery_info, properties, payload)
  serializer = consumer.get_serializer || Hutch::Config[:serializer]
  logger.debug {
    spec   = serializer.binary? ? "#{payload.bytesize} bytes" : "#{payload}"
    "message(#{properties.message_id || '-'}): " +
    "routing key: #{delivery_info.routing_key}, " +
    "consumer: #{consumer}, " +
    "payload: #{spec}"
  }

  message = Message.new(delivery_info, properties, payload, serializer)
  consumer_instance = consumer.new.tap { |c| c.broker, c.delivery_info = @broker, delivery_info }
  with_tracing(consumer_instance).handle(message)
  @broker.ack(delivery_info.delivery_tag)
rescue => ex
  acknowledge_error(delivery_info, properties, @broker, ex)
  handle_error(properties, payload, consumer, ex)
end

#runObject

Run the main event loop. The consumers will be set up with queues, and process the messages in their respective queues indefinitely. This method never returns.



22
23
24
25
26
27
28
29
# File 'lib/hutch/worker.rb', line 22

def run
  setup_queues
  setup_procs.each(&:call)

  Waiter.wait_until_signaled

  stop
end

#setup_queue(consumer) ⇒ Object

Bind a consumer’s routing keys to its queue, and set up a subscription to receive messages sent to the queue.



44
45
46
47
48
49
50
51
52
# File 'lib/hutch/worker.rb', line 44

def setup_queue(consumer)
  queue = @broker.queue(consumer.get_queue_name, consumer.get_arguments)
  @broker.bind_queue(queue, consumer.routing_keys)

  queue.subscribe(consumer_tag: unique_consumer_tag, manual_ack: true) do |*args|
    delivery_info, properties, payload = Hutch::Adapter.decode_message(*args)
    handle_message(consumer, delivery_info, properties, payload)
  end
end

#setup_queuesObject

Set up the queues for each of the worker’s consumers.



37
38
39
40
# File 'lib/hutch/worker.rb', line 37

def setup_queues
  logger.info 'setting up queues'
  @consumers.each { |consumer| setup_queue(consumer) }
end

#stopObject

Stop a running worker by killing all subscriber threads.



32
33
34
# File 'lib/hutch/worker.rb', line 32

def stop
  @broker.stop
end

#with_tracing(klass) ⇒ Object



75
76
77
# File 'lib/hutch/worker.rb', line 75

def with_tracing(klass)
  Hutch::Config[:tracer].new(klass)
end