Module: Resque::Plugins::UniqueAtRuntime
- Defined in:
- lib/resque-unique_at_runtime.rb,
lib/resque-unique_at_runtime/version.rb
Constant Summary collapse
- VERSION =
"2.0.2"
Instance Method Summary collapse
- #around_perform_unlock_runtime(*args) ⇒ Object
- #before_perform_lock_runtime(*args) ⇒ Object
- #can_lock_queue?(*args) ⇒ Boolean
-
#on_failure_unlock_runtime(*args) ⇒ Object
There may be scenarios where the around_perform’s ensure unlock duplicates the on_failure unlock, but that’s a small price to pay for uniqueness.
- #queue_locked?(*args) ⇒ Boolean
- #reenqueue(*args) ⇒ Object
- #runtime_lock_timeout ⇒ Object
- #runtime_lock_timeout_at(now) ⇒ Object
- #runtime_requeue_interval ⇒ Object
-
#unique_at_runtime_redis_key(*_) ⇒ Object
Overwrite this method to uniquely identify which mutex should be used for a resque worker.
- #unlock_queue(*args) ⇒ Object
Instance Method Details
#around_perform_unlock_runtime(*args) ⇒ Object
71 72 73 74 75 76 77 |
# File 'lib/resque-unique_at_runtime.rb', line 71 def around_perform_unlock_runtime(*args) begin yield ensure unlock_queue(*args) end end |
#before_perform_lock_runtime(*args) ⇒ Object
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 |
# File 'lib/resque-unique_at_runtime.rb', line 53 def before_perform_lock_runtime(*args) if (key = queue_locked?(*args)) puts "resque-unique_at_runtime: failed to lock queue with #{key}" if ENV['RESQUE_DEBUG'] # Sleep so the CPU's rest sleep(runtime_requeue_interval) # can't get the lock, so re-enqueue the task reenqueue(*args) # and don't perform raise Resque::Job::DontPerform else puts "will perform" true end end |
#can_lock_queue?(*args) ⇒ Boolean
25 26 27 |
# File 'lib/resque-unique_at_runtime.rb', line 25 def can_lock_queue?(*args) queue_locked?(*args) == false ? true : false end |
#on_failure_unlock_runtime(*args) ⇒ Object
There may be scenarios where the around_perform’s ensure unlock
duplicates the on_failure unlock, but that's a small price to pay for
uniqueness.
82 83 84 85 |
# File 'lib/resque-unique_at_runtime.rb', line 82 def on_failure_unlock_runtime(*args) puts "resque-unique_at_runtime: on failure unlock" if ENV['RESQUE_DEBUG'] unlock_queue(*args) end |
#queue_locked?(*args) ⇒ Boolean
29 30 31 32 33 34 35 36 37 38 39 40 41 |
# File 'lib/resque-unique_at_runtime.rb', line 29 def queue_locked?(*args) now = Time.now.to_i key = unique_at_runtime_redis_key(*args) timeout = runtime_lock_timeout_at(now) puts "resque-unique_at_runtime: attempting to lock queue with #{key}" if ENV['RESQUE_DEBUG'] # Per http://redis.io/commands/setnx return false if Resque.redis.setnx(key, timeout) return key if Resque.redis.get(key).to_i > now return false if Resque.redis.getset(key, timeout).to_i <= now return key end |
#reenqueue(*args) ⇒ Object
49 50 51 |
# File 'lib/resque-unique_at_runtime.rb', line 49 def reenqueue(*args) Resque.enqueue(self, *args) end |
#runtime_lock_timeout ⇒ Object
10 11 12 |
# File 'lib/resque-unique_at_runtime.rb', line 10 def runtime_lock_timeout self.instance_variable_get(:@runtime_lock_timeout) || 60 * 60 * 24 * 5 end |
#runtime_lock_timeout_at(now) ⇒ Object
6 7 8 |
# File 'lib/resque-unique_at_runtime.rb', line 6 def runtime_lock_timeout_at(now) now + runtime_lock_timeout + 1 end |
#runtime_requeue_interval ⇒ Object
14 15 16 |
# File 'lib/resque-unique_at_runtime.rb', line 14 def runtime_requeue_interval self.instance_variable_get(:@runtime_requeue_interval) || 1 end |
#unique_at_runtime_redis_key(*_) ⇒ Object
Overwrite this method to uniquely identify which mutex should be used for a resque worker.
20 21 22 23 |
# File 'lib/resque-unique_at_runtime.rb', line 20 def unique_at_runtime_redis_key(*_) puts "resque-unique_at_runtime: getting key for #{@queue}!" if ENV['RESQUE_DEBUG'] "resque-unique_at_runtime:#{@queue}" end |
#unlock_queue(*args) ⇒ Object
43 44 45 46 47 |
# File 'lib/resque-unique_at_runtime.rb', line 43 def unlock_queue(*args) key = unique_at_runtime_redis_key(*args) puts "resque-unique_at_runtime: unlock queue with #{key}" if ENV['RESQUE_DEBUG'] Resque.redis.del(key) end |