Module: Resque::Plugins::LockTimeout

Defined in:
lib/resque/plugins/lock_timeout.rb

Overview

If you want only one instance of your job running at a time, extend it with this module:

require 'resque-lock-timeout'

class UpdateNetworkGraph
  extend Resque::Plugins::LockTimeout
  @queue = :network_graph

  def self.perform(repo_id)
    heavy_lifting
  end
end

If you wish to limit the duration a lock may be held for, you can set/override ‘lock_timeout`. e.g.

class UpdateNetworkGraph
  extend Resque::Plugins::LockTimeout
  @queue = :network_graph

  # lock may be held for upto an hour.
  @lock_timeout = 3600

  def self.perform(repo_id)
    heavy_lifting
  end
end

If you wish that only one instance of the job defined by #identifier may be enqueued or running, you can set/override ‘loner`. e.g.

class PdfExport
  extend Resque::Plugins::LockTimeout
  @queue = :exports

  # only one job can be running/enqueued at a time. For instance a button
  # to run a PDF export. If the user clicks several times on it, enqueue
  # the job if and only if
  #   - the same export is not currently running
  #   - the same export is not currently queued.
  # ('same' being defined by `identifier`)
  @loner = true

  def self.perform(repo_id)
    heavy_lifting
  end
end

Instance Method Summary collapse

Instance Method Details

#acquire_lock!(*args) ⇒ Boolean, Fixnum

Try to acquire a lock for running the job.

Returns:

  • (Boolean, Fixnum)


160
161
162
# File 'lib/resque/plugins/lock_timeout.rb', line 160

def acquire_lock!(*args)
  acquire_lock_impl!(:redis_lock_key, :lock_failed, *args)
end

#acquire_lock_algorithm!(lock_key, *args) ⇒ Object

Attempts to acquire the lock using a timeout / deadlock algorithm.

Locking algorithm: code.google.com/p/redis/wiki/SetnxCommand

Parameters:

  • lock_key (String)

    redis lock key

  • args (Array)

    job arguments



203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
# File 'lib/resque/plugins/lock_timeout.rb', line 203

def acquire_lock_algorithm!(lock_key, *args)
  now = Time.now.to_i
  lock_until = now + lock_timeout(*args)
  acquired = false

  return [true, lock_until] if lock_redis.setnx(lock_key, lock_until)
  # Can't acquire the lock, see if it has expired.
  lock_expiration = lock_redis.get(lock_key)
  if lock_expiration && lock_expiration.to_i < now
    # expired, try to acquire.
    lock_expiration = lock_redis.getset(lock_key, lock_until)
    if lock_expiration.nil? || lock_expiration.to_i < now
      acquired = true
    end
  else
    # Try once more...
    acquired = true if lock_redis.setnx(lock_key, lock_until)
  end

  [acquired, lock_until]
end

#acquire_lock_impl!(lock_key_method, failed_hook, *args) ⇒ Boolean, Fixnum

Generic implementation of the locking logic

Returns false; when unable to acquire the lock.

  • Returns true; when lock acquired, without a timeout.

  • Returns timestamp; when lock acquired with a timeout, timestamp is when the lock timeout expires.

Parameters:

  • lock_key_method (Symbol)

    the method returning redis key to lock

  • failed_hook (Symbol)

    the method called if lock failed

  • args (Array)

    job arguments

Returns:

  • (Boolean, Fixnum)


181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
# File 'lib/resque/plugins/lock_timeout.rb', line 181

def acquire_lock_impl!(lock_key_method, failed_hook, *args)
  acquired = false
  lock_key = self.send lock_key_method, *args

  unless lock_timeout(*args) > 0
    # Acquire without using a timeout.
    acquired = true if lock_redis.setnx(lock_key, true)
  else
    # Acquire using the timeout algorithm.
    acquired, lock_until = acquire_lock_algorithm!(lock_key, *args)
  end

  self.send(failed_hook, *args) if !acquired
  lock_until && acquired ? lock_until : acquired
end

#acquire_loner_lock!(*args) ⇒ Boolean, Fixnum

Try to acquire a lock to enqueue a loner job.

Returns:

  • (Boolean, Fixnum)


166
167
168
# File 'lib/resque/plugins/lock_timeout.rb', line 166

def acquire_loner_lock!(*args)
  acquire_lock_impl!(:redis_loner_lock_key, :loner_enqueue_failed, *args)
end

#around_perform_lock(*args) ⇒ Object

Where the magic happens.

Parameters:

  • args (Array)

    job arguments



251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
# File 'lib/resque/plugins/lock_timeout.rb', line 251

def around_perform_lock(*args)
  lock_until = acquire_lock!(*args)

  # Release loner lock as job has been dequeued
  release_loner_lock!(*args) if loner

  # Abort if another job holds the lock.
  return unless lock_until

  begin
    yield
  ensure
    # Release the lock on success and error. Unless a lock_timeout is
    # used, then we need to be more careful before releasing the lock.
    unless lock_until === true
      now = Time.now.to_i
      if lock_until < now
        # Eeek! Lock expired before perform finished. Trigger callback.
        lock_expired_before_release(*args)
        return # dont relase lock.
      end
    end
    release_lock!(*args)
  end
end

#before_enqueue_lock(*args) ⇒ Object

This method is abstract.

if the job is a ‘loner`, enqueue only if no other same job is already running/enqueued

Parameters:

  • args (Array)

    job arguments



146
147
148
149
150
151
152
153
154
155
156
# File 'lib/resque/plugins/lock_timeout.rb', line 146

def before_enqueue_lock(*args)
  if loner
    if locked?(*args) 
      # Same job is currently running
      loner_enqueue_failed(*args)
      false
    else
      acquire_loner_lock!(*args)
    end
  end
end

#identifier(*args) ⇒ String?

This method is abstract.

You may override to implement a custom identifier, you should consider doing this if your job arguments are many/long or may not cleanly cleanly to strings.

Builds an identifier using the job arguments. This identifier is used as part of the redis lock key.

Parameters:

  • args (Array)

    job arguments

Returns:

  • (String, nil)

    job identifier



61
62
63
# File 'lib/resque/plugins/lock_timeout.rb', line 61

def identifier(*args)
  args.join('-')
end

#lock_expired_before_release(*args) ⇒ Object

This method is abstract.

Hook method; called when the lock expired before we released it.

Parameters:

  • args (Array)

    job arguments



138
139
# File 'lib/resque/plugins/lock_timeout.rb', line 138

def lock_expired_before_release(*args)
end

#lock_failed(*args) ⇒ Object

This method is abstract.

Hook method; called when a were unable to acquire the lock.

Parameters:

  • args (Array)

    job arguments



124
125
# File 'lib/resque/plugins/lock_timeout.rb', line 124

def lock_failed(*args)
end

#lock_redisRedis

Override to fully control the redis object used for storing the locks.

The default is Resque.redis

Returns:

  • (Redis)

    redis object



71
72
73
# File 'lib/resque/plugins/lock_timeout.rb', line 71

def lock_redis
  Resque.redis
end

#lock_timeout(*args) ⇒ Fixnum

Number of seconds the lock may be held for. A value of 0 or below will lock without a timeout.

Parameters:

  • args (Array)

    job arguments

Returns:

  • (Fixnum)


101
102
103
# File 'lib/resque/plugins/lock_timeout.rb', line 101

def lock_timeout(*args)
  @lock_timeout ||= 0
end

#locked?(*args) ⇒ Boolean

Convenience method, not used internally.

Returns:

  • (Boolean)

    true if the job is locked by someone



116
117
118
# File 'lib/resque/plugins/lock_timeout.rb', line 116

def locked?(*args)
  lock_redis.exists(redis_lock_key(*args))
end

#loner(*args) ⇒ TrueClass || FalseClass

Whether one instance of the job should be running or enqueued.

Parameters:

  • args (Array)

    job arguments

Returns:

  • (TrueClass || FalseClass)


109
110
111
# File 'lib/resque/plugins/lock_timeout.rb', line 109

def loner(*args)
  @loner ||= false
end

#loner_enqueue_failed(*args) ⇒ Object

This method is abstract.

Hook method; called when a were unable to enqueue loner job.

Parameters:

  • args (Array)

    job arguments



131
132
# File 'lib/resque/plugins/lock_timeout.rb', line 131

def loner_enqueue_failed(*args)
end

#redis_lock_key(*args) ⇒ String

Override to fully control the lock key used. It is passed the job arguments.

The default looks like this: ‘lock:<class name>:<identifier>`

Parameters:

  • args (Array)

    job arguments

Returns:

  • (String)

    redis key



82
83
84
# File 'lib/resque/plugins/lock_timeout.rb', line 82

def redis_lock_key(*args)
  ['lock', name, identifier(*args)].compact.join(':')
end

#redis_loner_lock_key(*args) ⇒ String

Builds lock key used by ‘@loner` option. Passed job arguments.

The default looks like this: ‘loner:lock:<class name>:<identifier>`

Parameters:

  • args (Array)

    job arguments

Returns:

  • (String)

    redis key



92
93
94
# File 'lib/resque/plugins/lock_timeout.rb', line 92

def redis_loner_lock_key(*args)
  ['loner', redis_lock_key(*args)].compact.join(':')
end

#refresh_lock!(*args) ⇒ Object

Refresh the lock.

Parameters:

  • args (Array)

    job arguments



242
243
244
245
246
# File 'lib/resque/plugins/lock_timeout.rb', line 242

def refresh_lock!(*args)
  now = Time.now.to_i
  lock_until = now + lock_timeout(*args)
  lock_redis.set(redis_lock_key(*args), lock_until)
end

#release_lock!(*args) ⇒ Object

Release the lock.

Parameters:

  • args (Array)

    job arguments



228
229
230
# File 'lib/resque/plugins/lock_timeout.rb', line 228

def release_lock!(*args)
  lock_redis.del(redis_lock_key(*args))
end

#release_loner_lock!(*args) ⇒ Object

Release the enqueue lock for loner jobs

Parameters:

  • args (Array)

    job arguments



235
236
237
# File 'lib/resque/plugins/lock_timeout.rb', line 235

def release_loner_lock!(*args)
  lock_redis.del(redis_loner_lock_key(*args))
end