Class: FrenzyBunnies::Handlers::Maxretry
- Inherits:
-
Object
- Object
- FrenzyBunnies::Handlers::Maxretry
- Defined in:
- lib/frenzy_bunnies/handlers/maxretry.rb
Overview
Maxretry uses dead letter policies on Rabbitmq to requeue and retry messages after failure (rejections, errors and timeouts). When the maximum number of retries is reached it will put the message on an error queue. This handler will only retry at the queue level. To accomplish that, the setup is a bit complex.
Input:
worker_exchange (eXchange)
worker_queue (Queue)
We create:
worker_queue-retry - (X) where we setup the worker queue to dead-letter.
worker_queue-retry - (Q) queue bound to ^ exchange, dead-letters to
worker_queue-retry-requeue.
worker_queue-error - (X) where to send max-retry failures
worker_queue-error - (Q) bound to worker_queue-error.
worker_queue-retry-requeue - (X) exchange to bind worker_queue to for
requeuing directly to the worker_queue.
This requires that you setup arguments to the worker queue to line up the dead letter queue. See the example for more information.
Many of these can be override with options:
-
retry_exchange - sets retry exchange & queue
-
retry_error_exchange - sets error exchange and queue
-
retry_requeue_exchange - sets the exchange created to re-queue things back to the worker queue.
Instance Method Summary collapse
- #acknowledge(hdr, msg) ⇒ Object
- #error(hdr, msg, err) ⇒ Object
-
#initialize(channel, queue, logger, opts) ⇒ Maxretry
constructor
A new instance of Maxretry.
- #noop(hdr, props, msg) ⇒ Object
- #reject(hdr, msg, requeue = false) ⇒ Object
- #timeout(hdr, msg) ⇒ Object
Constructor Details
#initialize(channel, queue, logger, opts) ⇒ Maxretry
Returns a new instance of Maxretry.
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 |
# File 'lib/frenzy_bunnies/handlers/maxretry.rb', line 36 def initialize(channel, queue, logger, opts) @logger = logger @worker_queue_name = queue.name @logger.debug do "#{log_prefix} creating handler, opts=#{opts}" end @channel = channel @opts = opts # Construct names, defaulting where suitable retry_name = @opts[:retry_exchange] || "#{@worker_queue_name}-retry" error_name = @opts[:retry_error_exchange] || "#{@worker_queue_name}-error" requeue_name = @opts[:retry_requeue_exchange] || "#{@worker_queue_name}-retry-requeue" # Create the exchanges @retry_exchange, @error_exchange, @requeue_exchange = [retry_name, error_name, requeue_name].map do |name| @logger.debug { "#{log_prefix} creating exchange=#{name}" } @channel.exchange(name, :type => 'topic', :durable => exchange_durable?) end # Create the queues and bindings @logger.debug do "#{log_prefix} creating queue=#{retry_name} x-dead-letter-exchange=#{requeue_name}" end @retry_queue = @channel.queue(retry_name, :durable => queue_durable?, :arguments => { 'x-dead-letter-exchange' => requeue_name, 'x-message-ttl' => @opts[:retry_timeout] || 60000 }) @retry_queue.bind(@retry_exchange, :routing_key => '#') @logger.debug do "#{log_prefix} creating queue=#{error_name}" end @error_queue = @channel.queue(error_name, :durable => queue_durable?) @error_queue.bind(@error_exchange, :routing_key => '#') # Finally, bind the worker queue to our requeue exchange queue.bind(@requeue_exchange, :routing_key => '#') @max_retries = @opts[:retry_max_times] || 5 end |
Instance Method Details
#acknowledge(hdr, msg) ⇒ Object
84 85 86 |
# File 'lib/frenzy_bunnies/handlers/maxretry.rb', line 84 def acknowledge(hdr, msg) @channel.acknowledge(hdr.delivery_tag, false) end |
#error(hdr, msg, err) ⇒ Object
99 100 101 |
# File 'lib/frenzy_bunnies/handlers/maxretry.rb', line 99 def error(hdr, msg, err) handle_retry(hdr, msg, err) end |
#noop(hdr, props, msg) ⇒ Object
107 108 109 |
# File 'lib/frenzy_bunnies/handlers/maxretry.rb', line 107 def noop(hdr, props, msg) end |
#reject(hdr, msg, requeue = false) ⇒ Object
88 89 90 91 92 93 94 95 96 |
# File 'lib/frenzy_bunnies/handlers/maxretry.rb', line 88 def reject(hdr, msg, requeue = false) if requeue # This was explicitly rejected specifying it be requeued so we do not # want it to pass through our retry logic. @channel.reject(hdr.delivery_tag, requeue) else handle_retry(hdr, msg, :reject) end end |
#timeout(hdr, msg) ⇒ Object
103 104 105 |
# File 'lib/frenzy_bunnies/handlers/maxretry.rb', line 103 def timeout(hdr, msg) handle_retry(hdr, msg, :timeout) end |