Module: Hutch::Retry::WorkerExtension
- Included in:
- Worker
- Defined in:
- lib/hutch/retry/worker_extension.rb
Instance Method Summary collapse
- #handle_message(consumer, delivery_info, properties, payload) ⇒ Object
- #handle_retry(consumer, delivery_info, properties, payload, ex) ⇒ Object
- #setup_queue(consumer) ⇒ Object
Instance Method Details
#handle_message(consumer, delivery_info, properties, payload) ⇒ Object
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
# File 'lib/hutch/retry/worker_extension.rb', line 20 def (consumer, delivery_info, properties, payload) serializer = consumer.get_serializer || Hutch::Config[:serializer] logger.debug { spec = serializer.binary? ? "#{payload.bytesize} bytes" : "#{payload}" "message(#{properties.message_id || '-'}): " + "routing key: #{delivery_info.routing_key}, " + "consumer: #{consumer}, " + "payload: #{spec}" } = Message.new(delivery_info, properties, payload, serializer) consumer_instance = consumer.new.tap { |c| c.broker, c.delivery_info = @broker, delivery_info } with_tracing(consumer_instance).handle() @broker.ack(delivery_info.delivery_tag) unless consumer_instance. rescue => ex if consumer.include?(Hutch::Retry::Consumer) handle_retry(consumer, delivery_info, properties, payload, ex) else acknowledge_error(delivery_info, properties, @broker, ex) end handle_error(properties, payload, consumer, ex) end |
#handle_retry(consumer, delivery_info, properties, payload, ex) ⇒ Object
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/hutch/retry/worker_extension.rb', line 43 def handle_retry(consumer, delivery_info, properties, payload, ex) case ex when *consumer.get_retry_on current_retry_count = (properties[:headers] || {}).fetch("backoff-delay-count", 0) if current_retry_count < consumer.get_max_retries logger.debug "Retry message_id=#{properties[:message_id]} counter=#{current_retry_count + 1}" @broker.ack(delivery_info.delivery_tag) consumer.retry_exchange.publish( payload, routing_key: delivery_info.routing_key, message_id: properties[:message_id], timestamp: Time.now.to_i, headers: { "backoff-delay": consumer.exp_backoff(current_retry_count), "backoff-delay-count": current_retry_count + 1 } ) else logger.debug "Max retries exceeded message_id=#{properties[:message_id]}" acknowledge_error(delivery_info, properties, @broker, ex) end else acknowledge_error(delivery_info, properties, @broker, ex) end end |
#setup_queue(consumer) ⇒ Object
6 7 8 9 10 11 12 13 14 15 16 17 18 |
# File 'lib/hutch/retry/worker_extension.rb', line 6 def setup_queue(consumer) logger.info "setting up queue: #{consumer.get_queue_name}" queue = @broker.queue(consumer.get_queue_name, consumer.) @broker.bind_queue(queue, consumer.routing_keys) consumer.create_retry_queues!(@broker) if consumer.include?(Hutch::Retry::Consumer) queue.subscribe(consumer_tag: unique_consumer_tag, manual_ack: true) do |*args| delivery_info, properties, payload = Hutch::Adapter.(*args) (consumer, delivery_info, properties, payload) end end |