Module: Resque::Plugins::LockTimeout

Defined in:
lib/resque/plugins/lock_timeout.rb

Overview

If you want only one instance of your job running at a time, extend it with this module:

require ‘resque-lock-timeout’

class UpdateNetworkGraph

extend Resque::Plugins::LockTimeout
@queue = :network_graph

def self.perform(repo_id)
  heavy_lifting
end

end

If you wish to limit the durati on a lock may be held for, you can set/override ‘lock_timeout`. e.g.

class UpdateNetworkGraph

extend Resque::Plugins::LockTimeout
@queue = :network_graph

# lock may be held for upto an hour.
@lock_timeout = 3600

def self.perform(repo_id)
  heavy_lifting
end

end

Instance Method Summary collapse

Instance Method Details

#acquire_lock!(*args) ⇒ Boolean, Fixnum

Try to acquire a lock.

  • Returns false; when unable to acquire the lock.

  • Returns true; when lock acquired, without a timeout.

  • Returns timestamp; when lock acquired with a timeout, timestamp is when the lock timeout expires.

Returns:

  • (Boolean, Fixnum)


106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
# File 'lib/resque/plugins/lock_timeout.rb', line 106

def acquire_lock!(*args)
  acquired = false
  lock_key = redis_lock_key(*args)

  unless lock_timeout(*args) > 0
    # Acquire without using a timeout.
    acquired = true if lock_redis.setnx(lock_key, true)
  else
    # Acquire using the timeout algorithm.
    acquired, lock_until = acquire_lock_algorithm!(lock_key, *args)
  end

  lock_failed(*args) if !acquired
  lock_until && acquired ? lock_until : acquired
end

#acquire_lock_algorithm!(lock_key, *args) ⇒ Object

Attempts to aquire the lock using a timeout / deadlock algorithm.

Locking algorithm: code.google.com/p/redis/wiki/SetnxCommand

Parameters:

  • lock_key (String)

    redis lock key

  • args (Array)

    job arguments



128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
# File 'lib/resque/plugins/lock_timeout.rb', line 128

def acquire_lock_algorithm!(lock_key, *args)
  now = Time.now.to_i
  lock_until = now + lock_timeout(*args)
  acquired = false

  return [true, lock_until] if lock_redis.setnx(lock_key, lock_until)
  # Can't acquire the lock, see if it has expired.
  lock_expiration = lock_redis.get(lock_key)
  if lock_expiration && lock_expiration.to_i < now
    # expired, try to acquire.
    lock_expiration = lock_redis.getset(lock_key, lock_until)
    if lock_expiration.nil? || lock_expiration.to_i < now
      acquired = true
    end
  else
    # Try once more...
    acquired = true if lock_redis.setnx(lock_key, lock_until)
  end

  [acquired, lock_until]
end

#around_perform_lock(*args) ⇒ Object

Where the magic happens.

Parameters:

  • args (Array)

    job arguments



176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
# File 'lib/resque/plugins/lock_timeout.rb', line 176

def around_perform_lock(*args)
  begin
    yield
  ensure
    # Release the lock on success and error. Unless a lock_timeout is
    # used, then we need to be more careful before releasing the lock.
    if lock_timeout(*args) > 0
      lock_until = lock_redis.get(redis_lock_key(*args)).to_i
      now = Time.now.to_i
      if lock_until < now
        # Eeek! Lock expired before perform finished. Trigger callback.
        lock_expired_before_release(*args)
        return # dont relase lock.
      end
    end
    release_lock!(*args)
  end
end

#before_enqueue_lock(*args) ⇒ Object

Try to acquire lock before enqueueing job. If lock can’t be acquired, the job won’t be enqueued

Parameters:

  • args (Array)

    job arguments



169
170
171
# File 'lib/resque/plugins/lock_timeout.rb', line 169

def before_enqueue_lock(*args)
  acquire_lock!(*args)
end

#identifier(*args) ⇒ String?

This method is abstract.

You may override to implement a custom identifier, you should consider doing this if your job arguments are many/long or may not cleanly cleanly to strings.

Builds an identifier using the job arguments. This identifier is used as part of the redis lock key.

Parameters:

  • args (Array)

    job arguments

Returns:

  • (String, nil)

    job identifier



42
43
44
# File 'lib/resque/plugins/lock_timeout.rb', line 42

def identifier(*args)
  args.join('-')
end

#lock_expired_before_release(*args) ⇒ Object

This method is abstract.

Hook method; called when the lock expired before we released it.

Parameters:

  • args (Array)

    job arguments



95
96
# File 'lib/resque/plugins/lock_timeout.rb', line 95

def lock_expired_before_release(*args)
end

#lock_failed(*args) ⇒ Object

This method is abstract.

Hook method; called when a were unable to aquire the lock.

Parameters:

  • args (Array)

    job arguments



88
89
# File 'lib/resque/plugins/lock_timeout.rb', line 88

def lock_failed(*args)
end

#lock_redisRedis

Override to fully control the redis object used for storing the locks.

The default is Resque.redis

Returns:

  • (Redis)

    redis object



52
53
54
# File 'lib/resque/plugins/lock_timeout.rb', line 52

def lock_redis
  Resque.redis
end

#lock_timeout(*args) ⇒ Fixnum

Number of seconds the lock may be held for. A value of 0 or below will lock without a timeout.

Parameters:

  • args (Array)

    job arguments

Returns:

  • (Fixnum)


73
74
75
# File 'lib/resque/plugins/lock_timeout.rb', line 73

def lock_timeout(*args)
  @lock_timeout ||= 0
end

#locked?(*args) ⇒ Boolean

Convenience method, not used internally.

Returns:

  • (Boolean)

    true if the job is locked by someone



80
81
82
# File 'lib/resque/plugins/lock_timeout.rb', line 80

def locked?(*args)
  lock_redis.exists(redis_lock_key(*args))
end

#redis_lock_key(*args) ⇒ String

Override to fully control the lock key used. It is passed the job arguments.

The default looks like this: ‘resque-lock-timeout:<class name>:<identifier>`

Parameters:

  • args (Array)

    job arguments

Returns:

  • (String)

    redis key



64
65
66
# File 'lib/resque/plugins/lock_timeout.rb', line 64

def redis_lock_key(*args)
  ['lock', name, identifier(*args)].compact.join(':')
end

#refresh_lock!(*args) ⇒ Object

Refresh the lock.

Parameters:

  • args (Array)

    job arguments



160
161
162
163
164
# File 'lib/resque/plugins/lock_timeout.rb', line 160

def refresh_lock!(*args)
  now = Time.now.to_i
  lock_until = now + lock_timeout(*args)
  lock_redis.set(redis_lock_key(*args), lock_until)
end

#release_lock!(*args) ⇒ Object

Release the lock.

Parameters:

  • args (Array)

    job arguments



153
154
155
# File 'lib/resque/plugins/lock_timeout.rb', line 153

def release_lock!(*args)
  lock_redis.del(redis_lock_key(*args))
end