Class: Redis::Semaphore
- Inherits:
-
Object
- Object
- Redis::Semaphore
- Defined in:
- lib/redis/semaphore.rb
Constant Summary collapse
- EXISTS_TOKEN =
"1"
- API_VERSION =
"1"
Instance Method Summary collapse
- #all_tokens ⇒ Object
- #available_count ⇒ Object
- #delete! ⇒ Object
- #exists? ⇒ Boolean
- #exists_or_create! ⇒ Object
- #generate_unique_token ⇒ Object
-
#initialize(name, opts = {}) ⇒ Semaphore
constructor
stale_client_timeout is the threshold of time before we assume that something has gone terribly wrong with a client and we invalidate it’s lock.
- #lock(timeout = nil) ⇒ Object (also: #wait)
- #locked?(token = nil) ⇒ Boolean
- #release_stale_locks! ⇒ Object
- #signal(token = 1) ⇒ Object
- #unlock ⇒ Object
Constructor Details
#initialize(name, opts = {}) ⇒ Semaphore
stale_client_timeout is the threshold of time before we assume that something has gone terribly wrong with a client and we invalidate it’s lock. Default is nil for which we don’t check for stale clients Redis::Semaphore.new(:my_semaphore, :stale_client_timeout => 30, :redis => myRedis) Redis::Semaphore.new(:my_semaphore, :redis => myRedis) Redis::Semaphore.new(:my_semaphore, :resources => 1, :redis => myRedis) Redis::Semaphore.new(:my_semaphore, :host => “”, :port => “”) Redis::Semaphore.new(:my_semaphore, :path => “bla”)
17 18 19 20 21 22 23 24 25 |
# File 'lib/redis/semaphore.rb', line 17 def initialize(name, opts = {}) @name = name @expiration = opts.delete(:expiration) @resource_count = opts.delete(:resources) || 1 @stale_client_timeout = opts.delete(:stale_client_timeout) @redis = opts.delete(:redis) || Redis.new(opts) @use_local_time = opts.delete(:use_local_time) @tokens = [] end |
Instance Method Details
#all_tokens ⇒ Object
121 122 123 124 125 126 |
# File 'lib/redis/semaphore.rb', line 121 def all_tokens @redis.multi do @redis.lrange(available_key, 0, -1) @redis.hkeys(grabbed_key) end.flatten end |
#available_count ⇒ Object
44 45 46 47 48 49 50 |
# File 'lib/redis/semaphore.rb', line 44 def available_count if exists? @redis.llen(available_key) else @resource_count end end |
#delete! ⇒ Object
52 53 54 55 56 57 |
# File 'lib/redis/semaphore.rb', line 52 def delete! @redis.del(available_key) @redis.del(grabbed_key) @redis.del(exists_key) @redis.del(version_key) end |
#exists? ⇒ Boolean
117 118 119 |
# File 'lib/redis/semaphore.rb', line 117 def exists? @redis.exists(exists_key) end |
#exists_or_create! ⇒ Object
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
# File 'lib/redis/semaphore.rb', line 27 def exists_or_create! token = @redis.getset(exists_key, EXISTS_TOKEN) if token.nil? create! else # Previous versions of redis-semaphore did not set `version_key`. # Make sure it's set now, so we can use it in future versions. if token == API_VERSION && @redis.get(version_key).nil? @redis.set(version_key, API_VERSION) end true end end |
#generate_unique_token ⇒ Object
128 129 130 131 132 133 134 135 |
# File 'lib/redis/semaphore.rb', line 128 def generate_unique_token tokens = all_tokens token = Random.rand.to_s while(tokens.include? token) token = Random.rand.to_s end end |
#lock(timeout = nil) ⇒ Object Also known as: wait
59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 |
# File 'lib/redis/semaphore.rb', line 59 def lock(timeout = nil) exists_or_create! release_stale_locks! if check_staleness? if timeout.nil? || timeout > 0 # passing timeout 0 to blpop causes it to block token_pair = @redis.blpop(available_key, timeout || 0) else token_pair = @redis.lpop(available_key) end return false if token_pair.nil? current_token = token_pair[1] @tokens.push(current_token) @redis.hset(grabbed_key, current_token, current_time.to_f) return_value = current_token if block_given? begin return_value = yield current_token ensure signal(current_token) end end return_value end |
#locked?(token = nil) ⇒ Boolean
94 95 96 97 98 99 100 101 102 103 104 |
# File 'lib/redis/semaphore.rb', line 94 def locked?(token = nil) if token @redis.hexists(grabbed_key, token) else @tokens.each do |token| return true if locked?(token) end false end end |
#release_stale_locks! ⇒ Object
137 138 139 140 141 142 143 144 145 146 147 |
# File 'lib/redis/semaphore.rb', line 137 def release_stale_locks! simple_mutex(:release_locks, 10) do @redis.hgetall(grabbed_key).each do |token, locked_at| timed_out_at = locked_at.to_f + @stale_client_timeout if timed_out_at < current_time.to_f signal(token) end end end end |
#signal(token = 1) ⇒ Object
106 107 108 109 110 111 112 113 114 115 |
# File 'lib/redis/semaphore.rb', line 106 def signal(token = 1) token ||= generate_unique_token @redis.multi do @redis.hdel grabbed_key, token @redis.lpush available_key, token set_expiration_if_necessary end end |
#unlock ⇒ Object
89 90 91 92 |
# File 'lib/redis/semaphore.rb', line 89 def unlock return false unless locked? signal(@tokens.pop)[1] end |