Module: Resque::Plugins::UniqueAtRuntime

Defined in:
lib/resque-unique_at_runtime.rb,
lib/resque-unique_at_runtime/version.rb

Constant Summary collapse

VERSION =
"2.0.2"

Instance Method Summary collapse

Instance Method Details

#around_perform_unlock_runtime(*args) ⇒ Object



71
72
73
74
75
76
77
# File 'lib/resque-unique_at_runtime.rb', line 71

def around_perform_unlock_runtime(*args)
  begin
    yield
  ensure
    unlock_queue(*args)
  end
end

#before_perform_lock_runtime(*args) ⇒ Object



53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/resque-unique_at_runtime.rb', line 53

def before_perform_lock_runtime(*args)
  if (key = queue_locked?(*args))
    puts "resque-unique_at_runtime: failed to lock queue with #{key}" if ENV['RESQUE_DEBUG']

    # Sleep so the CPU's rest
    sleep(runtime_requeue_interval)

    # can't get the lock, so re-enqueue the task
    reenqueue(*args)

    # and don't perform
    raise Resque::Job::DontPerform
  else
    puts "will perform"
    true
  end
end

#can_lock_queue?(*args) ⇒ Boolean

Returns:

  • (Boolean)


25
26
27
# File 'lib/resque-unique_at_runtime.rb', line 25

def can_lock_queue?(*args)
  queue_locked?(*args) == false ? true : false
end

#on_failure_unlock_runtime(*args) ⇒ Object

There may be scenarios where the around_perform’s ensure unlock

duplicates the on_failure unlock, but that's a small price to pay for 
uniqueness.


82
83
84
85
# File 'lib/resque-unique_at_runtime.rb', line 82

def on_failure_unlock_runtime(*args)
  puts "resque-unique_at_runtime: on failure unlock" if ENV['RESQUE_DEBUG']
  unlock_queue(*args)
end

#queue_locked?(*args) ⇒ Boolean

Returns:

  • (Boolean)


29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/resque-unique_at_runtime.rb', line 29

def queue_locked?(*args)
  now = Time.now.to_i
  key = unique_at_runtime_redis_key(*args)
  timeout = runtime_lock_timeout_at(now)

  puts "resque-unique_at_runtime: attempting to lock queue with #{key}" if ENV['RESQUE_DEBUG']

  # Per http://redis.io/commands/setnx
  return false if Resque.redis.setnx(key, timeout)
  return key if Resque.redis.get(key).to_i > now
  return false if Resque.redis.getset(key, timeout).to_i <= now
  return key
end

#reenqueue(*args) ⇒ Object



49
50
51
# File 'lib/resque-unique_at_runtime.rb', line 49

def reenqueue(*args)
  Resque.enqueue(self, *args)
end

#runtime_lock_timeoutObject



10
11
12
# File 'lib/resque-unique_at_runtime.rb', line 10

def runtime_lock_timeout
  self.instance_variable_get(:@runtime_lock_timeout) || 60 * 60 * 24 * 5
end

#runtime_lock_timeout_at(now) ⇒ Object



6
7
8
# File 'lib/resque-unique_at_runtime.rb', line 6

def runtime_lock_timeout_at(now)
  now + runtime_lock_timeout + 1
end

#runtime_requeue_intervalObject



14
15
16
# File 'lib/resque-unique_at_runtime.rb', line 14

def runtime_requeue_interval
  self.instance_variable_get(:@runtime_requeue_interval) || 1
end

#unique_at_runtime_redis_key(*_) ⇒ Object

Overwrite this method to uniquely identify which mutex should be used for a resque worker.



20
21
22
23
# File 'lib/resque-unique_at_runtime.rb', line 20

def unique_at_runtime_redis_key(*_)
  puts "resque-unique_at_runtime: getting key for #{@queue}!" if ENV['RESQUE_DEBUG']
  "resque-unique_at_runtime:#{@queue}"
end

#unlock_queue(*args) ⇒ Object



43
44
45
46
47
# File 'lib/resque-unique_at_runtime.rb', line 43

def unlock_queue(*args)
  key = unique_at_runtime_redis_key(*args)
  puts "resque-unique_at_runtime: unlock queue with #{key}" if ENV['RESQUE_DEBUG']
  Resque.redis.del(key)
end