Class: SneakersRetry::Handlers::Maxretry2

Inherits:
Object
  • Object
show all
Defined in:
lib/sneakers-retry/handlers/maxretry2.rb

Overview

Maxretry uses dead letter policies on Rabbitmq to requeue and retry messages after failure (rejections, errors and timeouts). When the maximum number of retries is reached it will put the message on an error queue. This handler will only retry at the queue level. To accomplish that, the setup is a bit complex.

Input:

worker_exchange (eXchange)
worker_queue (Queue)

We create:

worker_queue-retry - (X) where we setup the worker queue to dead-letter.
worker_queue-retry - (Q) queue bound to ^ exchange, dead-letters to
                     worker_queue-retry-requeue.
worker_queue-error - (X) where to send max-retry failures
worker_queue-error - (Q) bound to worker_queue-error.
worker_queue-retry-requeue - (X) exchange to bind worker_queue to for
                             requeuing directly to the worker_queue.

This requires that you setup arguments to the worker queue to line up the dead letter queue. See the example for more information.

Many of these can be override with options:

  • retry_exchange - sets retry exchange & queue

  • retry_error_exchange - sets error exchange and queue

  • retry_requeue_exchange - sets the exchange created to re-queue things back to the worker queue.

Instance Method Summary collapse

Constructor Details

#initialize(channel, queue, opts) ⇒ Maxretry2

Returns a new instance of Maxretry2.



35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# File 'lib/sneakers-retry/handlers/maxretry2.rb', line 35

def initialize(channel, queue, opts)
  puts "################################"
  @worker_queue_name = queue.name
  Sneakers.logger.debug do
    "#{log_prefix} creating handler, opts=#{opts}"
  end

  @channel = channel
  @opts = opts

  # Construct names, defaulting where suitable
  retry_name = @opts[:retry_exchange] || "#{@worker_queue_name}-retry"
  error_name = @opts[:retry_error_exchange] || "#{@worker_queue_name}-error"
  requeue_name = @opts[:retry_requeue_exchange] || "#{@worker_queue_name}-retry-requeue"
  retry_routing_key = @opts[:retry_routing_key] || "#"

  # Create the exchanges
  @retry_exchange, @error_exchange, @requeue_exchange = [retry_name, error_name, requeue_name].map do |name|
    Sneakers.logger.debug { "#{log_prefix} creating exchange=#{name}" }
    @channel.exchange(name,
                      :type => 'topic',
                      :durable => exchange_durable?)
  end

  # Create the queues and bindings
  Sneakers.logger.debug do
    "#{log_prefix} creating queue=#{retry_name} x-dead-letter-exchange=#{requeue_name}"
  end
  @retry_queue = @channel.queue(retry_name,
                               :durable => queue_durable?,
                               :arguments => {
                                 :'x-dead-letter-exchange' => requeue_name,
                                 :'x-message-ttl' => @opts[:retry_timeout] || 60000
                               })
  @retry_queue.bind(@retry_exchange, :routing_key => '#')

  Sneakers.logger.debug do
    "#{log_prefix} creating queue=#{error_name}"
  end
  @error_queue = @channel.queue(error_name,
                                :durable => queue_durable?)
  @error_queue.bind(@error_exchange, :routing_key => '#')

  # Finally, bind the worker queue to our requeue exchange
  queue.bind(@requeue_exchange, :routing_key => retry_routing_key)

  @max_retries = @opts[:retry_max_times] || 5

end

Instance Method Details

#acknowledge(hdr, props, msg) ⇒ Object



85
86
87
# File 'lib/sneakers-retry/handlers/maxretry2.rb', line 85

def acknowledge(hdr, props, msg)
  @channel.acknowledge(hdr.delivery_tag, false)
end

#error(hdr, props, msg, err) ⇒ Object



100
101
102
# File 'lib/sneakers-retry/handlers/maxretry2.rb', line 100

def error(hdr, props, msg, err)
  handle_retry(hdr, props, msg, err)
end

#noop(hdr, props, msg) ⇒ Object



108
109
110
# File 'lib/sneakers-retry/handlers/maxretry2.rb', line 108

def noop(hdr, props, msg)

end

#reject(hdr, props, msg, requeue = false) ⇒ Object



89
90
91
92
93
94
95
96
97
# File 'lib/sneakers-retry/handlers/maxretry2.rb', line 89

def reject(hdr, props, msg, requeue = false)
  if requeue
    # This was explicitly rejected specifying it be requeued so we do not
    # want it to pass through our retry logic.
    @channel.reject(hdr.delivery_tag, requeue)
  else
    handle_retry(hdr, props, msg, :reject)
  end
end

#timeout(hdr, props, msg) ⇒ Object



104
105
106
# File 'lib/sneakers-retry/handlers/maxretry2.rb', line 104

def timeout(hdr, props, msg)
  handle_retry(hdr, props, msg, :timeout)
end