Class: Sidekiq::Middleware::Server::RetryJobs
- Inherits:
-
Object
- Object
- Sidekiq::Middleware::Server::RetryJobs
- Includes:
- Util
- Defined in:
- lib/sidekiq/middleware/server/retry_jobs.rb
Overview
Automatically retry jobs that fail in Sidekiq. Sidekiq’s retry support assumes a typical development lifecycle:
-
push some code changes with a bug in it
-
bug causes message processing to fail, sidekiq’s middleware captures the message and pushes it onto a retry queue
-
sidekiq retries messages in the retry queue multiple times with an exponential delay, the message continues to fail
-
after a few days, a developer deploys a fix. the message is reprocessed successfully.
-
if 3 never happens, sidekiq will eventually give up and throw the message away. If the worker defines a method called ‘retries_exhausted’, this will be called before throwing the message away. If the ‘retries_exhausted’ method throws an exception, it’s dropped and logged.
A message looks like:
{ 'class' => 'HardWorker', 'args' => [1, 2, 'foo'] }
The ‘retry’ option also accepts a number (in place of ‘true’):
{ 'class' => 'HardWorker', 'args' => [1, 2, 'foo'], 'retry' => 5 }
The job will be retried this number of times before giving up. (If simply ‘true’, Sidekiq retries 25 times)
We’ll add a bit more data to the message to support retries:
* 'queue' - the queue to use
* 'retry_count' - number of times we've retried so far.
* 'error_message' - the message from the exception
* 'error_class' - the exception class
* 'failed_at' - the first time it failed
* 'retried_at' - the last time it was retried
We don’t store the backtrace as that can add a lot of overhead to the message and everyone is using Airbrake, right?
The default number of retry attempts is 25. You can pass a value for the number of retry attempts when adding the middleware using the options hash:
Sidekiq.configure_server do |config|
config.server_middleware do |chain|
chain.add Middleware::Server::RetryJobs, {:max_retries => 7}
end
end
Constant Summary collapse
- DEFAULT_MAX_RETRY_ATTEMPTS =
25
Constants included from Util
Instance Method Summary collapse
- #call(worker, msg, queue) ⇒ Object
- #delay_for(worker, count) ⇒ Object
-
#initialize(options = {}) ⇒ RetryJobs
constructor
A new instance of RetryJobs.
- #retries_exhausted(worker, msg) ⇒ Object
- #retry_attempts_from(msg_retry, default) ⇒ Object
- #retry_in(worker, count) ⇒ Object
-
#seconds_to_delay(count) ⇒ Object
delayed_job uses the same basic formula.
Methods included from Util
#hostname, #logger, #process_id, #redis, #watchdog
Methods included from ExceptionHandler
Constructor Details
#initialize(options = {}) ⇒ RetryJobs
Returns a new instance of RetryJobs.
57 58 59 |
# File 'lib/sidekiq/middleware/server/retry_jobs.rb', line 57 def initialize( = {}) @max_retries = .fetch(:max_retries, DEFAULT_MAX_RETRY_ATTEMPTS) end |
Instance Method Details
#call(worker, msg, queue) ⇒ Object
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 |
# File 'lib/sidekiq/middleware/server/retry_jobs.rb', line 61 def call(worker, msg, queue) yield rescue Sidekiq::Shutdown # ignore, will be pushed back onto queue during hard_shutdown raise rescue Exception => e raise e unless msg['retry'] max_retry_attempts = retry_attempts_from(msg['retry'], @max_retries) msg['queue'] = if msg['retry_queue'] msg['retry_queue'] else queue end msg['error_message'] = e. msg['error_class'] = e.class.name count = if msg['retry_count'] msg['retried_at'] = Time.now.utc msg['retry_count'] += 1 else msg['failed_at'] = Time.now.utc msg['retry_count'] = 0 end if msg['backtrace'] == true msg['error_backtrace'] = e.backtrace elsif msg['backtrace'] == false # do nothing elsif msg['backtrace'].to_i != 0 msg['error_backtrace'] = e.backtrace[0..msg['backtrace'].to_i] end if count < max_retry_attempts delay = delay_for(worker, count) logger.debug { "Failure! Retry #{count} in #{delay} seconds" } retry_at = Time.now.to_f + delay payload = Sidekiq.dump_json(msg) Sidekiq.redis do |conn| conn.zadd('retry', retry_at.to_s, payload) end else # Goodbye dear message, you (re)tried your best I'm sure. retries_exhausted(worker, msg) end raise e end |
#delay_for(worker, count) ⇒ Object
130 131 132 |
# File 'lib/sidekiq/middleware/server/retry_jobs.rb', line 130 def delay_for(worker, count) worker.sidekiq_retry_in_block? && retry_in(worker, count) || seconds_to_delay(count) end |
#retries_exhausted(worker, msg) ⇒ Object
109 110 111 112 113 114 115 116 117 118 119 120 |
# File 'lib/sidekiq/middleware/server/retry_jobs.rb', line 109 def retries_exhausted(worker, msg) logger.debug { "Dropping message after hitting the retry maximum: #{msg}" } if worker.respond_to?(:retries_exhausted) logger.warn { "Defining #{worker.class.name}#retries_exhausted as a method is deprecated, use `sidekiq_retries_exhausted` callback instead http://git.io/Ijju8g" } worker.retries_exhausted(*msg['args']) elsif worker.sidekiq_retries_exhausted_block? worker.sidekiq_retries_exhausted_block.call(msg) end rescue Exception => e handle_exception(e, { :context => "Error calling retries_exhausted" }) end |
#retry_attempts_from(msg_retry, default) ⇒ Object
122 123 124 125 126 127 128 |
# File 'lib/sidekiq/middleware/server/retry_jobs.rb', line 122 def retry_attempts_from(msg_retry, default) if msg_retry.is_a?(Fixnum) msg_retry else default end end |
#retry_in(worker, count) ⇒ Object
139 140 141 142 143 144 145 146 |
# File 'lib/sidekiq/middleware/server/retry_jobs.rb', line 139 def retry_in(worker, count) begin worker.sidekiq_retry_in_block.call(count) rescue Exception => e logger.error { "Failure scheduling retry using the defined `sidekiq_retry_in` in #{worker.class.name}, falling back to default: #{e.}"} nil end end |
#seconds_to_delay(count) ⇒ Object
delayed_job uses the same basic formula
135 136 137 |
# File 'lib/sidekiq/middleware/server/retry_jobs.rb', line 135 def seconds_to_delay(count) (count ** 4) + 15 + (rand(30)*(count+1)) end |