Module: Hutch::Retry::WorkerExtension

Included in:
Worker
Defined in:
lib/hutch/retry/worker_extension.rb

Instance Method Summary collapse

Instance Method Details

#handle_message(consumer, delivery_info, properties, payload) ⇒ Object



20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/hutch/retry/worker_extension.rb', line 20

def handle_message(consumer, delivery_info, properties, payload)
  serializer = consumer.get_serializer || Hutch::Config[:serializer]
  logger.debug {
    spec = serializer.binary? ? "#{payload.bytesize} bytes" : "#{payload}"
    "message(#{properties.message_id || '-'}): " +
    "routing key: #{delivery_info.routing_key}, " +
    "consumer: #{consumer}, " +
    "payload: #{spec}"
  }

  message = Message.new(delivery_info, properties, payload, serializer)
  consumer_instance = consumer.new.tap { |c| c.broker, c.delivery_info = @broker, delivery_info }
  with_tracing(consumer_instance).handle(message)
  @broker.ack(delivery_info.delivery_tag) unless consumer_instance.message_rejected?
rescue => ex
  if consumer.include?(Hutch::Retry::Consumer)
    handle_retry(consumer, delivery_info, properties, payload, ex)
  else
    acknowledge_error(delivery_info, properties, @broker, ex)
  end
  handle_error(properties, payload, consumer, ex)
end

#handle_retry(consumer, delivery_info, properties, payload, ex) ⇒ Object



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/hutch/retry/worker_extension.rb', line 43

def handle_retry(consumer, delivery_info, properties, payload, ex)
  case ex
  when *consumer.get_retry_on
    current_retry_count = (properties[:headers] || {}).fetch("backoff-delay-count", 0)

    if current_retry_count < consumer.get_max_retries
      logger.debug "Retry message_id=#{properties[:message_id]} counter=#{current_retry_count + 1}"

      @broker.ack(delivery_info.delivery_tag)
      consumer.retry_exchange.publish(
        payload,
        routing_key: delivery_info.routing_key,
        message_id: properties[:message_id],
        timestamp: Time.now.to_i,
        headers: {
          "backoff-delay": consumer.exp_backoff(current_retry_count),
          "backoff-delay-count": current_retry_count + 1
        }
      )
    else
      logger.debug "Max retries exceeded message_id=#{properties[:message_id]}"

      acknowledge_error(delivery_info, properties, @broker, ex)
    end
  else
    acknowledge_error(delivery_info, properties, @broker, ex)
  end
end

#setup_queue(consumer) ⇒ Object



6
7
8
9
10
11
12
13
14
15
16
17
18
# File 'lib/hutch/retry/worker_extension.rb', line 6

def setup_queue(consumer)
  logger.info "setting up queue: #{consumer.get_queue_name}"

  queue = @broker.queue(consumer.get_queue_name, consumer.get_options)
  @broker.bind_queue(queue, consumer.routing_keys)

  consumer.create_retry_queues!(@broker) if consumer.include?(Hutch::Retry::Consumer)

  queue.subscribe(consumer_tag: unique_consumer_tag, manual_ack: true) do |*args|
    delivery_info, properties, payload = Hutch::Adapter.decode_message(*args)
    handle_message(consumer, delivery_info, properties, payload)
  end
end