Class: FrenzyBunnies::Handlers::Maxretry

Inherits:
Object
  • Object
show all
Defined in:
lib/frenzy_bunnies/handlers/maxretry.rb

Overview

Maxretry uses dead letter policies on Rabbitmq to requeue and retry messages after failure (rejections, errors and timeouts). When the maximum number of retries is reached it will put the message on an error queue. This handler will only retry at the queue level. To accomplish that, the setup is a bit complex.

Input:

worker_exchange (eXchange)
worker_queue (Queue)

We create:

worker_queue-retry - (X) where we setup the worker queue to dead-letter.
worker_queue-retry - (Q) queue bound to ^ exchange, dead-letters to
                     worker_queue-retry-requeue.
worker_queue-error - (X) where to send max-retry failures
worker_queue-error - (Q) bound to worker_queue-error.
worker_queue-retry-requeue - (X) exchange to bind worker_queue to for
                             requeuing directly to the worker_queue.

This requires that you setup arguments to the worker queue to line up the dead letter queue. See the example for more information.

Many of these can be override with options:

  • retry_exchange - sets retry exchange & queue

  • retry_error_exchange - sets error exchange and queue

  • retry_requeue_exchange - sets the exchange created to re-queue things back to the worker queue.

Instance Method Summary collapse

Constructor Details

#initialize(channel, queue, logger, opts) ⇒ Maxretry

Returns a new instance of Maxretry.



36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/frenzy_bunnies/handlers/maxretry.rb', line 36

def initialize(channel, queue, logger, opts)
  @logger = logger
  @worker_queue_name = queue.name
  @logger.debug do
    "#{log_prefix} creating handler, opts=#{opts}"
  end

  @channel = channel
  @opts = opts

  # Construct names, defaulting where suitable
  retry_name = @opts[:retry_exchange] || "#{@worker_queue_name}-retry"
  error_name = @opts[:retry_error_exchange] || "#{@worker_queue_name}-error"
  requeue_name = @opts[:retry_requeue_exchange] || "#{@worker_queue_name}-retry-requeue"

  # Create the exchanges
  @retry_exchange, @error_exchange, @requeue_exchange = [retry_name, error_name, requeue_name].map do |name|
    @logger.debug { "#{log_prefix} creating exchange=#{name}" }
    @channel.exchange(name,
                      :type => 'topic',
                      :durable => exchange_durable?)
  end

  # Create the queues and bindings
  @logger.debug do
    "#{log_prefix} creating queue=#{retry_name} x-dead-letter-exchange=#{requeue_name}"
  end
  @retry_queue = @channel.queue(retry_name,
                               :durable => queue_durable?,
                               :arguments => {
                                 'x-dead-letter-exchange' => requeue_name,
                                 'x-message-ttl' => @opts[:retry_timeout] || 60000
                               })
  @retry_queue.bind(@retry_exchange, :routing_key => '#')

  @logger.debug do
    "#{log_prefix} creating queue=#{error_name}"
  end
  @error_queue = @channel.queue(error_name,
                                :durable => queue_durable?)
  @error_queue.bind(@error_exchange, :routing_key => '#')

  # Finally, bind the worker queue to our requeue exchange
  queue.bind(@requeue_exchange, :routing_key => '#')

  @max_retries = @opts[:retry_max_times] || 5
end

Instance Method Details

#acknowledge(hdr, msg) ⇒ Object



84
85
86
# File 'lib/frenzy_bunnies/handlers/maxretry.rb', line 84

def acknowledge(hdr, msg)
  @channel.acknowledge(hdr.delivery_tag, false)
end

#error(hdr, msg, err) ⇒ Object



99
100
101
# File 'lib/frenzy_bunnies/handlers/maxretry.rb', line 99

def error(hdr, msg, err)
  handle_retry(hdr, msg, err)
end

#noop(hdr, props, msg) ⇒ Object



107
108
109
# File 'lib/frenzy_bunnies/handlers/maxretry.rb', line 107

def noop(hdr, props, msg)

end

#reject(hdr, msg, requeue = false) ⇒ Object



88
89
90
91
92
93
94
95
96
# File 'lib/frenzy_bunnies/handlers/maxretry.rb', line 88

def reject(hdr, msg, requeue = false)
  if requeue
    # This was explicitly rejected specifying it be requeued so we do not
    # want it to pass through our retry logic.
    @channel.reject(hdr.delivery_tag, requeue)
  else
    handle_retry(hdr, msg, :reject)
  end
end

#timeout(hdr, msg) ⇒ Object



103
104
105
# File 'lib/frenzy_bunnies/handlers/maxretry.rb', line 103

def timeout(hdr, msg)
  handle_retry(hdr, msg, :timeout)
end