Class: Sneakers::Handlers::Maxretry

Inherits:
Object
  • Object
show all
Defined in:
lib/sneakers/handlers/maxretry.rb

Overview

Maxretry uses dead letter policies on Rabbitmq to requeue and retry messages after failure (rejections and errors). When the maximum number of retries is reached it will put the message on an error queue. This handler will only retry at the queue level. To accomplish that, the setup is a bit complex.

Input:

worker_exchange (eXchange)
worker_queue (Queue)

We create:

worker_queue-retry - (X) where we setup the worker queue to dead-letter.
worker_queue-retry - (Q) queue bound to ^ exchange, dead-letters to
                     worker_queue-retry-requeue.
worker_queue-error - (X) where to send max-retry failures
worker_queue-error - (Q) bound to worker_queue-error.
worker_queue-retry-requeue - (X) exchange to bind worker_queue to for
                             requeuing directly to the worker_queue.

This requires that you setup arguments to the worker queue to line up the dead letter queue. See the example for more information.

Many of these can be override with options:

  • retry_exchange - sets retry exchange & queue

  • retry_error_exchange - sets error exchange and queue

  • retry_requeue_exchange - sets the exchange created to re-queue things back to the worker queue.

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(channel, queue, opts) ⇒ Maxretry

Returns a new instance of Maxretry.



36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# File 'lib/sneakers/handlers/maxretry.rb', line 36

def initialize(channel, queue, opts)
  @worker_queue_name = queue.name
  Sneakers.logger.debug do
    "#{log_prefix} creating handler, opts=#{opts}"
  end

  @channel = channel
  @opts = opts

  # Construct names, defaulting where suitable
  retry_name = @opts[:retry_exchange] || "#{@worker_queue_name}-retry"
  error_name = @opts[:retry_error_exchange] || "#{@worker_queue_name}-error"
  requeue_name = @opts[:retry_requeue_exchange] || "#{@worker_queue_name}-retry-requeue"
  retry_routing_key = @opts[:retry_routing_key] || "#"

  # Create the exchanges
  @retry_exchange, @error_exchange, @requeue_exchange = [retry_name, error_name, requeue_name].map do |name|
    Sneakers.logger.debug { "#{log_prefix} creating exchange=#{name}" }
    @channel.exchange(name,
                      :type => 'topic',
                      :durable => exchange_durable?)
  end

  # Create the queues and bindings
  Sneakers.logger.debug do
    "#{log_prefix} creating queue=#{retry_name} x-dead-letter-exchange=#{requeue_name}"
  end
  @retry_queue = @channel.queue(retry_name,
                               :durable => queue_durable?,
                               :arguments => {
                                 :'x-dead-letter-exchange' => requeue_name,
                                 :'x-message-ttl' => @opts[:retry_timeout] || 60000
                               })
  @retry_queue.bind(@retry_exchange, :routing_key => '#')

  Sneakers.logger.debug do
    "#{log_prefix} creating queue=#{error_name}"
  end
  @error_queue = @channel.queue(error_name,
                                :durable => queue_durable?)
  @error_queue.bind(@error_exchange, :routing_key => '#')

  # Finally, bind the worker queue to our requeue exchange
  queue.bind(@requeue_exchange, :routing_key => retry_routing_key)

  @max_retries = @opts[:retry_max_times] || 5

end

Class Method Details

.configure_queue(name, opts) ⇒ Object



85
86
87
88
89
90
91
92
93
94
# File 'lib/sneakers/handlers/maxretry.rb', line 85

def self.configure_queue(name, opts)
  retry_name = opts.fetch(:retry_exchange, "#{name}-retry")
  opt_args = if opts.dig(:queue_options, :arguments).blank?
              {}
             else
              opts.dig(:queue_options, :arguments).transform_keys(&:to_sym)
             end
  opts[:queue_options][:arguments] = { :'x-dead-letter-exchange' => retry_name }.merge!(opt_args)
  opts[:queue_options]
end

Instance Method Details

#acknowledge(hdr, props, msg) ⇒ Object



96
97
98
# File 'lib/sneakers/handlers/maxretry.rb', line 96

def acknowledge(hdr, props, msg)
  @channel.acknowledge(hdr.delivery_tag, false)
end

#error(hdr, props, msg, err) ⇒ Object



111
112
113
# File 'lib/sneakers/handlers/maxretry.rb', line 111

def error(hdr, props, msg, err)
  handle_retry(hdr, props, msg, err)
end

#noop(hdr, props, msg) ⇒ Object



115
116
117
# File 'lib/sneakers/handlers/maxretry.rb', line 115

def noop(hdr, props, msg)

end

#reject(hdr, props, msg, requeue = false) ⇒ Object



100
101
102
103
104
105
106
107
108
# File 'lib/sneakers/handlers/maxretry.rb', line 100

def reject(hdr, props, msg, requeue = false)
  if requeue
    # This was explicitly rejected specifying it be requeued so we do not
    # want it to pass through our retry logic.
    @channel.reject(hdr.delivery_tag, requeue)
  else
    handle_retry(hdr, props, msg, :reject)
  end
end