Class: Creeper::Middleware::Server::RetryJobs

Inherits:
Object
  • Object
show all
Includes:
Util
Defined in:
lib/creeper/middleware/server/retry_jobs.rb

Overview

Automatically retry jobs that fail in Creeper. Creeper’s retry support assumes a typical development lifecycle:

  1. push some code changes with a bug in it

  2. bug causes message processing to fail, creeper’s middleware captures the message and pushes it onto a retry queue

  3. creeper retries messages in the retry queue multiple times with an exponential delay, the message continues to fail

  4. after a few days, a developer deploys a fix. the message is reprocessed successfully.

  5. if 3 never happens, creeper will eventually give up and throw the message away.

A message looks like:

{ 'class' => 'HardWorker', 'args' => [1, 2, 'foo'] }

We’ll add a bit more data to the message to support retries:

* 'queue' - the queue to use
* 'retry_count' - number of times we've retried so far.
* 'error_message' - the message from the exception
* 'error_class' - the exception class
* 'failed_at' - the first time it failed
* 'retried_at' - the last time it was retried

We don’t store the backtrace as that can add a lot of overhead to the message and everyone is using Airbrake, right?

Constant Summary collapse

MAX_COUNT =

delayed_job uses the same basic formula

25
DELAY =
proc { |count| (count ** 4) + 15 }

Constants included from Util

Util::EXPIRY

Instance Method Summary collapse

Methods included from Util

#beanstalk, #constantize, #logger, #process_id, #redis, #watchdog

Methods included from ExceptionHandler

#handle_exception

Instance Method Details

#call(worker, msg, queue, job, conn) ⇒ Object



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/creeper/middleware/server/retry_jobs.rb', line 39

def call(worker, msg, queue, job, conn)
  yield
rescue => e
  raise unless msg['retry']

  msg['queue'] = queue
  msg['error_message'] = e.message
  msg['error_class'] = e.class.name
  count = if msg['retry_count']
    msg['retried_at'] = Time.now.utc
    msg['retry_count'] += 1
  else
    msg['failed_at'] = Time.now.utc
    msg['retry_count'] = 0
  end

  if msg['backtrace'] == true
    msg['error_backtrace'] = e.backtrace
  elsif msg['backtrace'].to_i != 0
    msg['error_backtrace'] = e.backtrace[0..msg['backtrace'].to_i]
  end

  if count <= MAX_COUNT
    delay = DELAY.call(count)
    logger.debug { "Failure! Retry #{count} in #{delay} seconds" }
    retry_at = Time.now.to_f + delay
    payload = Creeper.dump_json(msg)
    Creeper.redis do |conn|
      conn.zadd('retry', retry_at.to_s, payload)
    end
  else
    # Goodbye dear message, you (re)tried your best I'm sure.
    logger.debug { "Dropping message after hitting the retry maximum: #{msg}" }
  end
  raise
end