Class: ArAsyncCounterCache::IncrementCountersWorker

Inherits:
Object
  • Object
show all
Extended by:
Resque::Plugins::LockTimeout
Defined in:
lib/ar_async_counter_cache/increment_counters_worker.rb

Overview

ArAsyncCounterCache will very quickly increment a counter cache in Redis, which will then later be updated by a Resque job. Using require-lock-timeout, we can ensure that only one job per _ is running at a time.

Class Method Summary collapse

Class Method Details

.around_perform_lock1(*args) ⇒ Object

The name of this method ensures that it runs within around_perform_lock.

We’ve leveraged resque-lock-timeout to ensure that only one job is running at a time. Now, this around filter essentially ensures that only one job per parent-column can sit on the queue at once. Since the cache_key entry in redis stores the most up-to-date delta for the parent’s counter cache, we don’t have to actually perform the Klass.update_counters for every increment/decrement. We can batch process!



79
80
81
82
83
84
85
86
87
# File 'lib/ar_async_counter_cache/increment_counters_worker.rb', line 79

def self.around_perform_lock1(*args)
  # Remove all other instances of this job (with the same args) from the
  # queue. Uses LREM (http://code.google.com/p/redis/wiki/LremCommand) which
  # takes the form: "LREM key count value" and if count == 0 removes all
  # instances of value from the list.
  redis_job_value = ::Resque.encode(:class => self.to_s, :args => args)
  ::Resque.redis.lrem("queue:#{@queue}", 0, redis_job_value)
  yield
end

.cache_and_enqueue(parent_class, id, column, direction) ⇒ Object



38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/ar_async_counter_cache/increment_counters_worker.rb', line 38

def self.cache_and_enqueue(parent_class, id, column, direction)
  parent_class = parent_class.to_s
  key = cache_key(parent_class, id, column)
  if direction == :increment
    redis.incr(key)
  elsif direction == :decrement
    redis.decr(key)
  else
    raise ArgumentError, "Must call ArAsyncCounterCache::IncrementCountersWorker with :increment or :decrement"
  end
  ::Resque.enqueue(self, parent_class, id, column)
end

.cache_key(*args) ⇒ Object

args: (parent_class, id, column)



66
67
68
# File 'lib/ar_async_counter_cache/increment_counters_worker.rb', line 66

def self.cache_key(*args)
  "ar-async-counter-cache:#{identifier(*args)}"
end

.identifier(*args) ⇒ Object

args: (parent_class, id, column)



56
57
58
# File 'lib/ar_async_counter_cache/increment_counters_worker.rb', line 56

def self.identifier(*args)
  args.join('-')
end

.lock_failed(*args) ⇒ Object

Try again later if lock is in use.



61
62
63
# File 'lib/ar_async_counter_cache/increment_counters_worker.rb', line 61

def self.lock_failed(*args)
  ::Resque.enqueue(self, *args)
end

.perform(parent_class, id, column) ⇒ Object



89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/ar_async_counter_cache/increment_counters_worker.rb', line 89

def self.perform(parent_class, id, column)
  key = cache_key(parent_class, id, column)
  if (delta = redis.getset(key, 0).to_i) != 0
    begin
      parent_class = ::Resque.constantize(parent_class)
      parent_class.find(id)
      parent_class.update_counters(id, column => delta)
    rescue Exception => e
      # If anything happens, set back the counter cache.
      if delta > 0
        redis.incrby(key, delta)
      elsif delta < 0
        redis.decrby(key, -delta)
      end
      raise e
    end
  end
end

.redisObject



51
52
53
# File 'lib/ar_async_counter_cache/increment_counters_worker.rb', line 51

def self.redis
  @redis || ::Resque.redis
end