Module: Resque::Plugins::LockTimeout
- Defined in:
- lib/resque/plugins/lock_timeout.rb
Overview
If you want only one instance of your job running at a time, extend it with this module:
require 'resque-lock-timeout'
class UpdateNetworkGraph
extend Resque::Plugins::LockTimeout
@queue = :network_graph
def self.perform(repo_id)
heavy_lifting
end
end
If you wish to limit the duration a lock may be held for, you can set/override ‘lock_timeout`. e.g.
class UpdateNetworkGraph
extend Resque::Plugins::LockTimeout
@queue = :network_graph
# lock may be held for upto an hour.
@lock_timeout = 3600
def self.perform(repo_id)
heavy_lifting
end
end
If you wish that only one instance of the job defined by #identifier may be enqueued or running, you can set/override ‘loner`. e.g.
class PdfExport
extend Resque::Plugins::LockTimeout
@queue = :exports
# only one job can be running/enqueued at a time. For instance a button
# to run a PDF export. If the user clicks several times on it, enqueue
# the job if and only if
# - the same export is not currently running
# - the same export is not currently queued.
# ('same' being defined by `identifier`)
@loner = true
def self.perform(repo_id)
heavy_lifting
end
end
Instance Method Summary collapse
-
#acquire_lock!(*args) ⇒ Boolean, Fixnum
Try to acquire a lock for running the job.
-
#acquire_lock_algorithm!(lock_key, *args) ⇒ Object
Attempts to acquire the lock using a timeout / deadlock algorithm.
-
#acquire_lock_impl!(lock_key_method, failed_hook, *args) ⇒ Boolean, Fixnum
Generic implementation of the locking logic.
-
#acquire_loner_lock!(*args) ⇒ Boolean, Fixnum
Try to acquire a lock to enqueue a loner job.
-
#around_perform_lock(*args) ⇒ Object
Where the magic happens.
-
#before_enqueue_lock(*args) ⇒ Object
abstract
if the job is a ‘loner`, enqueue only if no other same job is already running/enqueued.
-
#identifier(*args) ⇒ String?
abstract
Builds an identifier using the job arguments.
-
#lock_expired_before_release(*args) ⇒ Object
abstract
Hook method; called when the lock expired before we released it.
-
#lock_failed(*args) ⇒ Object
abstract
Hook method; called when a were unable to acquire the lock.
-
#lock_redis ⇒ Redis
Override to fully control the redis object used for storing the locks.
-
#lock_timeout(*args) ⇒ Fixnum
Number of seconds the lock may be held for.
-
#locked?(*args) ⇒ Boolean
Convenience method, not used internally.
-
#loner(*args) ⇒ TrueClass || FalseClass
Whether one instance of the job should be running or enqueued.
-
#loner_enqueue_failed(*args) ⇒ Object
abstract
Hook method; called when a were unable to enqueue loner job.
-
#redis_lock_key(*args) ⇒ String
Override to fully control the lock key used.
-
#redis_loner_lock_key(*args) ⇒ String
Builds lock key used by ‘@loner` option.
-
#refresh_lock!(*args) ⇒ Object
Refresh the lock.
-
#release_lock!(*args) ⇒ Object
Release the lock.
-
#release_loner_lock!(*args) ⇒ Object
Release the enqueue lock for loner jobs.
Instance Method Details
#acquire_lock!(*args) ⇒ Boolean, Fixnum
Try to acquire a lock for running the job.
160 161 162 |
# File 'lib/resque/plugins/lock_timeout.rb', line 160 def acquire_lock!(*args) acquire_lock_impl!(:redis_lock_key, :lock_failed, *args) end |
#acquire_lock_algorithm!(lock_key, *args) ⇒ Object
Attempts to acquire the lock using a timeout / deadlock algorithm.
Locking algorithm: code.google.com/p/redis/wiki/SetnxCommand
203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 |
# File 'lib/resque/plugins/lock_timeout.rb', line 203 def acquire_lock_algorithm!(lock_key, *args) now = Time.now.to_i lock_until = now + lock_timeout(*args) acquired = false return [true, lock_until] if lock_redis.setnx(lock_key, lock_until) # Can't acquire the lock, see if it has expired. lock_expiration = lock_redis.get(lock_key) if lock_expiration && lock_expiration.to_i < now # expired, try to acquire. lock_expiration = lock_redis.getset(lock_key, lock_until) if lock_expiration.nil? || lock_expiration.to_i < now acquired = true end else # Try once more... acquired = true if lock_redis.setnx(lock_key, lock_until) end [acquired, lock_until] end |
#acquire_lock_impl!(lock_key_method, failed_hook, *args) ⇒ Boolean, Fixnum
Generic implementation of the locking logic
Returns false; when unable to acquire the lock.
-
Returns true; when lock acquired, without a timeout.
-
Returns timestamp; when lock acquired with a timeout, timestamp is when the lock timeout expires.
181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 |
# File 'lib/resque/plugins/lock_timeout.rb', line 181 def acquire_lock_impl!(lock_key_method, failed_hook, *args) acquired = false lock_key = self.send lock_key_method, *args unless lock_timeout(*args) > 0 # Acquire without using a timeout. acquired = true if lock_redis.setnx(lock_key, true) else # Acquire using the timeout algorithm. acquired, lock_until = acquire_lock_algorithm!(lock_key, *args) end self.send(failed_hook, *args) if !acquired lock_until && acquired ? lock_until : acquired end |
#acquire_loner_lock!(*args) ⇒ Boolean, Fixnum
Try to acquire a lock to enqueue a loner job.
166 167 168 |
# File 'lib/resque/plugins/lock_timeout.rb', line 166 def acquire_loner_lock!(*args) acquire_lock_impl!(:redis_loner_lock_key, :loner_enqueue_failed, *args) end |
#around_perform_lock(*args) ⇒ Object
Where the magic happens.
251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 |
# File 'lib/resque/plugins/lock_timeout.rb', line 251 def around_perform_lock(*args) lock_until = acquire_lock!(*args) # Release loner lock as job has been dequeued release_loner_lock!(*args) if loner # Abort if another job holds the lock. return unless lock_until begin yield ensure # Release the lock on success and error. Unless a lock_timeout is # used, then we need to be more careful before releasing the lock. unless lock_until === true now = Time.now.to_i if lock_until < now # Eeek! Lock expired before perform finished. Trigger callback. lock_expired_before_release(*args) return # dont relase lock. end end release_lock!(*args) end end |
#before_enqueue_lock(*args) ⇒ Object
if the job is a ‘loner`, enqueue only if no other same job is already running/enqueued
146 147 148 149 150 151 152 153 154 155 156 |
# File 'lib/resque/plugins/lock_timeout.rb', line 146 def before_enqueue_lock(*args) if loner if locked?(*args) # Same job is currently running loner_enqueue_failed(*args) false else acquire_loner_lock!(*args) end end end |
#identifier(*args) ⇒ String?
You may override to implement a custom identifier, you should consider doing this if your job arguments are many/long or may not cleanly cleanly to strings.
Builds an identifier using the job arguments. This identifier is used as part of the redis lock key.
61 62 63 |
# File 'lib/resque/plugins/lock_timeout.rb', line 61 def identifier(*args) args.join('-') end |
#lock_expired_before_release(*args) ⇒ Object
Hook method; called when the lock expired before we released it.
138 139 |
# File 'lib/resque/plugins/lock_timeout.rb', line 138 def lock_expired_before_release(*args) end |
#lock_failed(*args) ⇒ Object
Hook method; called when a were unable to acquire the lock.
124 125 |
# File 'lib/resque/plugins/lock_timeout.rb', line 124 def lock_failed(*args) end |
#lock_redis ⇒ Redis
Override to fully control the redis object used for storing the locks.
The default is Resque.redis
71 72 73 |
# File 'lib/resque/plugins/lock_timeout.rb', line 71 def lock_redis Resque.redis end |
#lock_timeout(*args) ⇒ Fixnum
Number of seconds the lock may be held for. A value of 0 or below will lock without a timeout.
101 102 103 |
# File 'lib/resque/plugins/lock_timeout.rb', line 101 def lock_timeout(*args) @lock_timeout ||= 0 end |
#locked?(*args) ⇒ Boolean
Convenience method, not used internally.
116 117 118 |
# File 'lib/resque/plugins/lock_timeout.rb', line 116 def locked?(*args) lock_redis.exists(redis_lock_key(*args)) end |
#loner(*args) ⇒ TrueClass || FalseClass
Whether one instance of the job should be running or enqueued.
109 110 111 |
# File 'lib/resque/plugins/lock_timeout.rb', line 109 def loner(*args) @loner ||= false end |
#loner_enqueue_failed(*args) ⇒ Object
Hook method; called when a were unable to enqueue loner job.
131 132 |
# File 'lib/resque/plugins/lock_timeout.rb', line 131 def loner_enqueue_failed(*args) end |
#redis_lock_key(*args) ⇒ String
Override to fully control the lock key used. It is passed the job arguments.
The default looks like this: ‘lock:<class name>:<identifier>`
82 83 84 |
# File 'lib/resque/plugins/lock_timeout.rb', line 82 def redis_lock_key(*args) ['lock', name, identifier(*args)].compact.join(':') end |
#redis_loner_lock_key(*args) ⇒ String
Builds lock key used by ‘@loner` option. Passed job arguments.
The default looks like this: ‘loner:lock:<class name>:<identifier>`
92 93 94 |
# File 'lib/resque/plugins/lock_timeout.rb', line 92 def redis_loner_lock_key(*args) ['loner', redis_lock_key(*args)].compact.join(':') end |
#refresh_lock!(*args) ⇒ Object
Refresh the lock.
242 243 244 245 246 |
# File 'lib/resque/plugins/lock_timeout.rb', line 242 def refresh_lock!(*args) now = Time.now.to_i lock_until = now + lock_timeout(*args) lock_redis.set(redis_lock_key(*args), lock_until) end |
#release_lock!(*args) ⇒ Object
Release the lock.
228 229 230 |
# File 'lib/resque/plugins/lock_timeout.rb', line 228 def release_lock!(*args) lock_redis.del(redis_lock_key(*args)) end |
#release_loner_lock!(*args) ⇒ Object
Release the enqueue lock for loner jobs
235 236 237 |
# File 'lib/resque/plugins/lock_timeout.rb', line 235 def release_loner_lock!(*args) lock_redis.del(redis_loner_lock_key(*args)) end |