Class: Redis::Semaphore
- Inherits:
-
Object
- Object
- Redis::Semaphore
- Defined in:
- lib/redis/semaphore.rb
Constant Summary collapse
- API_VERSION =
"1"
Instance Method Summary collapse
- #all_tokens ⇒ Object
- #available_count ⇒ Object
- #delete! ⇒ Object
- #exists? ⇒ Boolean
- #exists_or_create! ⇒ Object
- #generate_unique_token ⇒ Object
-
#initialize(name, opts = {}) ⇒ Semaphore
constructor
stale_client_timeout is the threshold of time before we assume that something has gone terribly wrong with a client and we invalidate it’s lock.
- #lock(timeout = 0) ⇒ Object (also: #wait)
- #locked?(token = nil) ⇒ Boolean
- #release_stale_locks! ⇒ Object
- #signal(token = 1) ⇒ Object
- #unlock ⇒ Object
Constructor Details
#initialize(name, opts = {}) ⇒ Semaphore
stale_client_timeout is the threshold of time before we assume that something has gone terribly wrong with a client and we invalidate it’s lock. Default is nil for which we don’t check for stale clients Redis::Semaphore.new(:my_semaphore, :stale_client_timeout => 30, :redis => myRedis) Redis::Semaphore.new(:my_semaphore, :redis => myRedis) Redis::Semaphore.new(:my_semaphore, :resources => 1, :redis => myRedis) Redis::Semaphore.new(:my_semaphore, :connection => “”, :port => “”) Redis::Semaphore.new(:my_semaphore, :path => “bla”)
16 17 18 19 20 21 22 23 |
# File 'lib/redis/semaphore.rb', line 16 def initialize(name, opts = {}) @name = name @resource_count = opts.delete(:resources) || 1 @stale_client_timeout = opts.delete(:stale_client_timeout) @redis = opts.delete(:redis) || Redis.new(opts) @use_local_time = opts.delete(:use_local_time) @tokens = [] end |
Instance Method Details
#all_tokens ⇒ Object
101 102 103 104 105 106 |
# File 'lib/redis/semaphore.rb', line 101 def all_tokens @redis.multi do @redis.lrange(available_key, 0, -1) @redis.hkeys(grabbed_key) end.flatten end |
#available_count ⇒ Object
37 38 39 |
# File 'lib/redis/semaphore.rb', line 37 def available_count @redis.llen(available_key) end |
#delete! ⇒ Object
41 42 43 44 45 |
# File 'lib/redis/semaphore.rb', line 41 def delete! @redis.del(available_key) @redis.del(grabbed_key) @redis.del(exists_key) end |
#exists? ⇒ Boolean
97 98 99 |
# File 'lib/redis/semaphore.rb', line 97 def exists? @redis.exists(exists_key) end |
#exists_or_create! ⇒ Object
25 26 27 28 29 30 31 32 33 34 35 |
# File 'lib/redis/semaphore.rb', line 25 def exists_or_create! token = @redis.getset(exists_key, API_VERSION) if token.nil? create! elsif token != API_VERSION raise "Semaphore exists but running as wrong version (version #{token} vs #{API_VERSION})." else true end end |
#generate_unique_token ⇒ Object
108 109 110 111 112 113 114 115 |
# File 'lib/redis/semaphore.rb', line 108 def generate_unique_token tokens = all_tokens token = Random.rand.to_s while(tokens.include? token) token = Random.rand.to_s end end |
#lock(timeout = 0) ⇒ Object Also known as: wait
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/redis/semaphore.rb', line 47 def lock(timeout = 0) exists_or_create! release_stale_locks! if check_staleness? token_pair = @redis.blpop(available_key, timeout) return false if token_pair.nil? current_token = token_pair[1] @tokens.push(current_token) @redis.hset(grabbed_key, current_token, current_time.to_f) return_value = current_token if block_given? begin return_value = yield current_token ensure signal(current_token) end end return_value end |
#locked?(token = nil) ⇒ Boolean
76 77 78 79 80 81 82 83 84 85 86 |
# File 'lib/redis/semaphore.rb', line 76 def locked?(token = nil) if token @redis.hexists(grabbed_key, token) else @tokens.each do |token| return true if locked?(token) end false end end |
#release_stale_locks! ⇒ Object
117 118 119 120 121 122 123 124 125 126 127 |
# File 'lib/redis/semaphore.rb', line 117 def release_stale_locks! simple_mutex(:release_locks, 10) do @redis.hgetall(grabbed_key).each do |token, locked_at| timed_out_at = locked_at.to_f + @stale_client_timeout if timed_out_at < current_time.to_f signal(token) end end end end |
#signal(token = 1) ⇒ Object
88 89 90 91 92 93 94 95 |
# File 'lib/redis/semaphore.rb', line 88 def signal(token = 1) token ||= generate_unique_token @redis.multi do @redis.hdel grabbed_key, token @redis.lpush available_key, token end end |
#unlock ⇒ Object
71 72 73 74 |
# File 'lib/redis/semaphore.rb', line 71 def unlock return false unless locked? signal(@tokens.pop)[1] end |