Class: Sidekiq::Middleware::Server::RetryJobs

Inherits:
Object
  • Object
show all
Includes:
Util
Defined in:
lib/sidekiq/middleware/server/retry_jobs.rb

Overview

Automatically retry jobs that fail in Sidekiq. Sidekiq’s retry support assumes a typical development lifecycle:

  1. push some code changes with a bug in it

  2. bug causes message processing to fail, sidekiq’s middleware captures the message and pushes it onto a retry queue

  3. sidekiq retries messages in the retry queue multiple times with an exponential delay, the message continues to fail

  4. after a few days, a developer deploys a fix. the message is reprocessed successfully.

  5. if 3 never happens, sidekiq will eventually give up and throw the message away. If the worker defines a method called ‘retries_exhausted’, this will be called before throwing the message away. If the ‘retries_exhausted’ method throws an exception, it’s dropped and logged.

A message looks like:

{ 'class' => 'HardWorker', 'args' => [1, 2, 'foo'] }

The ‘retry’ option also accepts a number (in place of ‘true’):

{ 'class' => 'HardWorker', 'args' => [1, 2, 'foo'], 'retry' => 5 }

The job will be retried this number of times before giving up. (If simply ‘true’, Sidekiq retries 25 times)

We’ll add a bit more data to the message to support retries:

* 'queue' - the queue to use
* 'retry_count' - number of times we've retried so far.
* 'error_message' - the message from the exception
* 'error_class' - the exception class
* 'failed_at' - the first time it failed
* 'retried_at' - the last time it was retried

We don’t store the backtrace as that can add a lot of overhead to the message and everyone is using Airbrake, right?

The default number of retry attempts is 25. You can pass a value for the number of retry attempts when adding the middleware using the options hash:

Sidekiq.configure_server do |config|
  config.server_middleware do |chain|
    chain.add Middleware::Server::RetryJobs, {:max_retries => 7}
  end
end

Constant Summary collapse

DEFAULT_MAX_RETRY_ATTEMPTS =
25

Constants included from Util

Util::EXPIRY

Instance Method Summary collapse

Methods included from Util

#hostname, #logger, #process_id, #redis, #watchdog

Methods included from ExceptionHandler

#handle_exception

Constructor Details

#initialize(options = {}) ⇒ RetryJobs

Returns a new instance of RetryJobs.



57
58
59
# File 'lib/sidekiq/middleware/server/retry_jobs.rb', line 57

def initialize(options = {})
  @max_retries = options.fetch(:max_retries, DEFAULT_MAX_RETRY_ATTEMPTS)
end

Instance Method Details

#call(worker, msg, queue) ⇒ Object



61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# File 'lib/sidekiq/middleware/server/retry_jobs.rb', line 61

def call(worker, msg, queue)
  yield
rescue Sidekiq::Shutdown
  # ignore, will be pushed back onto queue during hard_shutdown
  raise
rescue Exception => e
  raise e unless msg['retry']
  max_retry_attempts = retry_attempts_from(msg['retry'], @max_retries)

  msg['queue'] = if msg['retry_queue']
    msg['retry_queue']
  else
    queue
  end
  msg['error_message'] = e.message
  msg['error_class'] = e.class.name
  count = if msg['retry_count']
    msg['retried_at'] = Time.now.utc
    msg['retry_count'] += 1
  else
    msg['failed_at'] = Time.now.utc
    msg['retry_count'] = 0
  end

  if msg['backtrace'] == true
    msg['error_backtrace'] = e.backtrace
  elsif msg['backtrace'] == false
    # do nothing
  elsif msg['backtrace'].to_i != 0
    msg['error_backtrace'] = e.backtrace[0..msg['backtrace'].to_i]
  end

  if count < max_retry_attempts
    delay = delay_for(worker, count)
    logger.debug { "Failure! Retry #{count} in #{delay} seconds" }
    retry_at = Time.now.to_f + delay
    payload = Sidekiq.dump_json(msg)
    Sidekiq.redis do |conn|
      conn.zadd('retry', retry_at.to_s, payload)
    end
  else
    # Goodbye dear message, you (re)tried your best I'm sure.
    retries_exhausted(worker, msg)
  end

  raise e
end

#delay_for(worker, count) ⇒ Object



130
131
132
# File 'lib/sidekiq/middleware/server/retry_jobs.rb', line 130

def delay_for(worker, count)
  worker.sidekiq_retry_in_block? && retry_in(worker, count) || seconds_to_delay(count)
end

#retries_exhausted(worker, msg) ⇒ Object



109
110
111
112
113
114
115
116
117
118
119
120
# File 'lib/sidekiq/middleware/server/retry_jobs.rb', line 109

def retries_exhausted(worker, msg)
  logger.debug { "Dropping message after hitting the retry maximum: #{msg}" }
  if worker.respond_to?(:retries_exhausted)
    logger.warn { "Defining #{worker.class.name}#retries_exhausted as a method is deprecated, use `sidekiq_retries_exhausted` callback instead http://git.io/Ijju8g" }
    worker.retries_exhausted(*msg['args'])
  elsif worker.sidekiq_retries_exhausted_block?
    worker.sidekiq_retries_exhausted_block.call(msg)
  end

rescue Exception => e
  handle_exception(e, { :context => "Error calling retries_exhausted" })
end

#retry_attempts_from(msg_retry, default) ⇒ Object



122
123
124
125
126
127
128
# File 'lib/sidekiq/middleware/server/retry_jobs.rb', line 122

def retry_attempts_from(msg_retry, default)
  if msg_retry.is_a?(Fixnum)
    msg_retry
  else
    default
  end
end

#retry_in(worker, count) ⇒ Object



139
140
141
142
143
144
145
146
# File 'lib/sidekiq/middleware/server/retry_jobs.rb', line 139

def retry_in(worker, count)
  begin
    worker.sidekiq_retry_in_block.call(count)
  rescue Exception => e
    logger.error { "Failure scheduling retry using the defined `sidekiq_retry_in` in #{worker.class.name}, falling back to default: #{e.message}"}
    nil
  end
end

#seconds_to_delay(count) ⇒ Object

delayed_job uses the same basic formula



135
136
137
# File 'lib/sidekiq/middleware/server/retry_jobs.rb', line 135

def seconds_to_delay(count)
  (count ** 4) + 15 + (rand(30)*(count+1))
end