Class: Resque::Plugins::ResqueCleaner

Inherits:
Object
  • Object
show all
Includes:
Helpers
Defined in:
lib/resque_cleaner.rb

Overview

ResqueCleaner class provides useful functionalities to retry or clean failed jobs. Let’s clean up your failed list!

Defined Under Namespace

Modules: FailedJobEx Classes: Limiter

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeResqueCleaner

Initializes instance



23
24
25
26
27
# File 'lib/resque_cleaner.rb', line 23

def initialize
  @failure = Resque::Failure.backend
  @print_message = true
  @limiter = Limiter.new self
end

Instance Attribute Details

#limiterObject (readonly)

ResqueCleaner fetches all elements from Redis and checks them by linear when filtering them. Since there is a performance concern, ResqueCleaner handles only the latest x(default 1000) jobs.

You can change the value through limiter attribute. e.g. cleaner.limiter.maximum = 5000



17
18
19
# File 'lib/resque_cleaner.rb', line 17

def limiter
  @limiter
end

Set false if you don’t show any message.



20
21
22
# File 'lib/resque_cleaner.rb', line 20

def print_message
  @print_message
end

Instance Method Details

#clear(&block) ⇒ Object

Clears every jobs for which block evaluates to true.



95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
# File 'lib/resque_cleaner.rb', line 95

def clear(&block)
  cleared = 0
  @limiter.lock do
    @limiter.jobs.each_with_index do |job,i|
      if !block_given? || block.call(job)
        index = @limiter.start_index + i - cleared
        # fetches again since you can't ensure that it is always true:
        # a == endode(decode(a))
        value = redis.lindex(:failed, index)
        redis.lrem(:failed, 1, value)
        cleared += 1
      end
    end
  end
  cleared
end

#clear_staleObject

Clears all jobs except the last X jobs



140
141
142
143
144
145
# File 'lib/resque_cleaner.rb', line 140

def clear_stale
  return 0 unless @limiter.on?
  c = @limiter.maximum
  redis.ltrim(:failed, -c, -1)
  c
end

#failureObject

Returns failure backend. Only supports redis backend.



35
36
37
# File 'lib/resque_cleaner.rb', line 35

def failure
  @failure
end

#log(msg) ⇒ Object

Outputs message. Overrides this method when you want to change a output stream.



267
268
269
# File 'lib/resque_cleaner.rb', line 267

def log(msg)
  puts msg if print?
end

#print?Boolean

Returns:

  • (Boolean)


271
272
273
# File 'lib/resque_cleaner.rb', line 271

def print?
  @print_message
end

Print stats



79
80
81
82
83
84
85
# File 'lib/resque_cleaner.rb', line 79

def print_stats(stats)
  log too_many_message if @limiter.on?
  stats.keys.sort.each do |k|
    log "%15s: %4d" % [k,stats[k]]
  end
  log "%15s: %4d" % ["total", @limiter.count]
end

#redisObject

Returns redis instance.



30
31
32
# File 'lib/resque_cleaner.rb', line 30

def redis
  Resque.redis
end

#requeue(clear_after_requeue = false, options = {}, &block) ⇒ Object

Retries every jobs for which block evaluates to true.



113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
# File 'lib/resque_cleaner.rb', line 113

def requeue(clear_after_requeue=false, options={}, &block)
  requeued = 0
  queue = options["queue"] || options[:queue]
  @limiter.lock do
    @limiter.jobs.each_with_index do |job,i|
      if !block_given? || block.call(job)
        index = @limiter.start_index + i - requeued

        if clear_after_requeue
          # remove job
          value = redis.lindex(:failed, index)
          redis.lrem(:failed, 1, value)
        else
          # mark retried
          job['retried_at'] = Time.now.strftime("%Y/%m/%d %H:%M:%S")
          redis.lset(:failed, @limiter.start_index+i, Resque.encode(job))
        end

        Job.create(queue||job['queue'], job['payload']['class'], *job['payload']['args'])
        requeued += 1
      end
    end
  end
  requeued
end

#select(&block) ⇒ Object Also known as: failure_jobs

Returns every jobs for which block evaluates to true.



88
89
90
91
# File 'lib/resque_cleaner.rb', line 88

def select(&block)
  jobs = @limiter.jobs
  block_given? ? @limiter.jobs.select(&block) : jobs
end

#stats_by_class(&block) ⇒ Object

Stats by class.



53
54
55
56
57
58
59
60
61
62
63
# File 'lib/resque_cleaner.rb', line 53

def stats_by_class(&block)
  jobs, stats = select(&block), {}
  jobs.each do |job|
    klass = job["payload"]["class"]
    stats[klass] ||= 0
    stats[klass] += 1
  end

  print_stats(stats) if print?
  stats
end

#stats_by_date(&block) ⇒ Object

Stats by date.



40
41
42
43
44
45
46
47
48
49
50
# File 'lib/resque_cleaner.rb', line 40

def stats_by_date(&block)
  jobs, stats = select(&block), {}
  jobs.each do |job|
    date = job["failed_at"][0,10]
    stats[date] ||= 0
    stats[date] += 1
  end

  print_stats(stats) if print?
  stats
end

#stats_by_exception(&block) ⇒ Object

Stats by exception.



66
67
68
69
70
71
72
73
74
75
76
# File 'lib/resque_cleaner.rb', line 66

def stats_by_exception(&block)
  jobs, stats = select(&block), {}
  jobs.each do |job|
    exception = job["exception"]
    stats[exception] ||= 0
    stats[exception] += 1
  end

  print_stats(stats) if print?
  stats
end

#too_many_messageObject



275
276
277
# File 'lib/resque_cleaner.rb', line 275

def too_many_message
  "There are too many failed jobs(count=#{@failure.count}). This only looks at last #{@limiter.maximum} jobs."
end