Module: Resque::Plugins::UniqueAtRuntime

Defined in:
lib/resque-unique_at_runtime.rb,
lib/resque-unique_at_runtime/version.rb

Constant Summary collapse

LOCK_TIMEOUT =
60 * 60 * 24 * 5
REQUEUE_INTERVAL =
1
VERSION =
"2.0.4"

Instance Method Summary collapse

Instance Method Details

#around_perform_unlock_runtime(*args) ⇒ Object



74
75
76
77
78
79
80
# File 'lib/resque-unique_at_runtime.rb', line 74

def around_perform_unlock_runtime(*args)
  begin
    yield
  ensure
    unlock_queue(*args)
  end
end

#before_perform_lock_runtime(*args) ⇒ Object



56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/resque-unique_at_runtime.rb', line 56

def before_perform_lock_runtime(*args)
  if (key = queue_locked?(*args))
    puts "resque-unique_at_runtime: failed to lock queue with #{key}" if ENV['RESQUE_DEBUG']

    # Sleep so the CPU's rest
    sleep(runtime_requeue_interval)

    # can't get the lock, so re-enqueue the task
    reenqueue(*args)

    # and don't perform
    raise Resque::Job::DontPerform
  else
    puts "will perform"
    true
  end
end

#can_lock_queue?(*args) ⇒ Boolean

Returns:

  • (Boolean)


28
29
30
# File 'lib/resque-unique_at_runtime.rb', line 28

def can_lock_queue?(*args)
  queue_locked?(*args) == false ? true : false
end

#on_failure_unlock_runtime(*args) ⇒ Object

There may be scenarios where the around_perform’s ensure unlock

duplicates the on_failure unlock, but that's a small price to pay for 
uniqueness.


85
86
87
88
# File 'lib/resque-unique_at_runtime.rb', line 85

def on_failure_unlock_runtime(*args)
  puts "resque-unique_at_runtime: on failure unlock" if ENV['RESQUE_DEBUG']
  unlock_queue(*args)
end

#queue_locked?(*args) ⇒ Boolean

Returns:

  • (Boolean)


32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/resque-unique_at_runtime.rb', line 32

def queue_locked?(*args)
  now = Time.now.to_i
  key = unique_at_runtime_redis_key(*args)
  timeout = runtime_lock_timeout_at(now)

  puts "resque-unique_at_runtime: attempting to lock queue with #{key}" if ENV['RESQUE_DEBUG']

  # Per http://redis.io/commands/setnx
  return false if Resque.redis.setnx(key, timeout)
  return key if Resque.redis.get(key).to_i > now
  return false if Resque.redis.getset(key, timeout).to_i <= now
  return key
end

#reenqueue(*args) ⇒ Object



52
53
54
# File 'lib/resque-unique_at_runtime.rb', line 52

def reenqueue(*args)
  Resque.enqueue(self, *args)
end

#runtime_lock_timeoutObject



13
14
15
# File 'lib/resque-unique_at_runtime.rb', line 13

def runtime_lock_timeout
  self.instance_variable_get(:@runtime_lock_timeout) || LOCK_TIMEOUT
end

#runtime_lock_timeout_at(now) ⇒ Object



9
10
11
# File 'lib/resque-unique_at_runtime.rb', line 9

def runtime_lock_timeout_at(now)
  now + runtime_lock_timeout + 1
end

#runtime_requeue_intervalObject



17
18
19
# File 'lib/resque-unique_at_runtime.rb', line 17

def runtime_requeue_interval
  self.instance_variable_get(:@runtime_requeue_interval) || REQUEUE_INTERVAL
end

#unique_at_runtime_redis_key(*_) ⇒ Object

Overwrite this method to uniquely identify which mutex should be used for a resque worker.



23
24
25
26
# File 'lib/resque-unique_at_runtime.rb', line 23

def unique_at_runtime_redis_key(*_)
  puts "resque-unique_at_runtime: getting key for #{@queue}!" if ENV['RESQUE_DEBUG']
  "resque-unique_at_runtime:#{@queue}"
end

#unlock_queue(*args) ⇒ Object



46
47
48
49
50
# File 'lib/resque-unique_at_runtime.rb', line 46

def unlock_queue(*args)
  key = unique_at_runtime_redis_key(*args)
  puts "resque-unique_at_runtime: unlock queue with #{key}" if ENV['RESQUE_DEBUG']
  Resque.redis.del(key)
end