Module: Resque::Plugins::UniqueAtRuntime
- Defined in:
- lib/resque-unique_at_runtime.rb,
lib/resque-unique_at_runtime/version.rb
Constant Summary collapse
- LOCK_TIMEOUT =
60 * 60 * 24 * 5
- REQUEUE_INTERVAL =
1
- VERSION =
"2.0.4"
Instance Method Summary collapse
- #around_perform_unlock_runtime(*args) ⇒ Object
- #before_perform_lock_runtime(*args) ⇒ Object
- #can_lock_queue?(*args) ⇒ Boolean
-
#on_failure_unlock_runtime(*args) ⇒ Object
There may be scenarios where the around_perform’s ensure unlock duplicates the on_failure unlock, but that’s a small price to pay for uniqueness.
- #queue_locked?(*args) ⇒ Boolean
- #reenqueue(*args) ⇒ Object
- #runtime_lock_timeout ⇒ Object
- #runtime_lock_timeout_at(now) ⇒ Object
- #runtime_requeue_interval ⇒ Object
-
#unique_at_runtime_redis_key(*_) ⇒ Object
Overwrite this method to uniquely identify which mutex should be used for a resque worker.
- #unlock_queue(*args) ⇒ Object
Instance Method Details
#around_perform_unlock_runtime(*args) ⇒ Object
74 75 76 77 78 79 80 |
# File 'lib/resque-unique_at_runtime.rb', line 74 def around_perform_unlock_runtime(*args) begin yield ensure unlock_queue(*args) end end |
#before_perform_lock_runtime(*args) ⇒ Object
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 |
# File 'lib/resque-unique_at_runtime.rb', line 56 def before_perform_lock_runtime(*args) if (key = queue_locked?(*args)) puts "resque-unique_at_runtime: failed to lock queue with #{key}" if ENV['RESQUE_DEBUG'] # Sleep so the CPU's rest sleep(runtime_requeue_interval) # can't get the lock, so re-enqueue the task reenqueue(*args) # and don't perform raise Resque::Job::DontPerform else puts "will perform" true end end |
#can_lock_queue?(*args) ⇒ Boolean
28 29 30 |
# File 'lib/resque-unique_at_runtime.rb', line 28 def can_lock_queue?(*args) queue_locked?(*args) == false ? true : false end |
#on_failure_unlock_runtime(*args) ⇒ Object
There may be scenarios where the around_perform’s ensure unlock
duplicates the on_failure unlock, but that's a small price to pay for
uniqueness.
85 86 87 88 |
# File 'lib/resque-unique_at_runtime.rb', line 85 def on_failure_unlock_runtime(*args) puts "resque-unique_at_runtime: on failure unlock" if ENV['RESQUE_DEBUG'] unlock_queue(*args) end |
#queue_locked?(*args) ⇒ Boolean
32 33 34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/resque-unique_at_runtime.rb', line 32 def queue_locked?(*args) now = Time.now.to_i key = unique_at_runtime_redis_key(*args) timeout = runtime_lock_timeout_at(now) puts "resque-unique_at_runtime: attempting to lock queue with #{key}" if ENV['RESQUE_DEBUG'] # Per http://redis.io/commands/setnx return false if Resque.redis.setnx(key, timeout) return key if Resque.redis.get(key).to_i > now return false if Resque.redis.getset(key, timeout).to_i <= now return key end |
#reenqueue(*args) ⇒ Object
52 53 54 |
# File 'lib/resque-unique_at_runtime.rb', line 52 def reenqueue(*args) Resque.enqueue(self, *args) end |
#runtime_lock_timeout ⇒ Object
13 14 15 |
# File 'lib/resque-unique_at_runtime.rb', line 13 def runtime_lock_timeout self.instance_variable_get(:@runtime_lock_timeout) || LOCK_TIMEOUT end |
#runtime_lock_timeout_at(now) ⇒ Object
9 10 11 |
# File 'lib/resque-unique_at_runtime.rb', line 9 def runtime_lock_timeout_at(now) now + runtime_lock_timeout + 1 end |
#runtime_requeue_interval ⇒ Object
17 18 19 |
# File 'lib/resque-unique_at_runtime.rb', line 17 def runtime_requeue_interval self.instance_variable_get(:@runtime_requeue_interval) || REQUEUE_INTERVAL end |
#unique_at_runtime_redis_key(*_) ⇒ Object
Overwrite this method to uniquely identify which mutex should be used for a resque worker.
23 24 25 26 |
# File 'lib/resque-unique_at_runtime.rb', line 23 def unique_at_runtime_redis_key(*_) puts "resque-unique_at_runtime: getting key for #{@queue}!" if ENV['RESQUE_DEBUG'] "resque-unique_at_runtime:#{@queue}" end |
#unlock_queue(*args) ⇒ Object
46 47 48 49 50 |
# File 'lib/resque-unique_at_runtime.rb', line 46 def unlock_queue(*args) key = unique_at_runtime_redis_key(*args) puts "resque-unique_at_runtime: unlock queue with #{key}" if ENV['RESQUE_DEBUG'] Resque.redis.del(key) end |