Class: RedisLocks::Semaphore
- Inherits:
-
Object
- Object
- RedisLocks::Semaphore
- Defined in:
- lib/redis_locks/semaphore.rb
Constant Summary collapse
- NAMESPACE =
"semaphore-lua"
- SETUP_SCRIPT =
Removes stale locks, then ensures that all resources which aren’t locked are marked as available.
<<-LUA local avail_key = KEYS[1] local grabbed_key = KEYS[2] local expected_resources = tonumber(ARGV[1]) local stale_before = tonumber(ARGV[2]) redis.call('zremrangebyscore', grabbed_key, -1, stale_before) local found_resources = redis.call('llen', avail_key) + redis.call('zcard', grabbed_key) if found_resources < expected_resources then for i=1,(expected_resources - found_resources) do redis.call('rpush', avail_key, 1) end end LUA
- SETUP_DIGEST =
Digest::SHA1.hexdigest(SETUP_SCRIPT)
Instance Method Summary collapse
-
#delete! ⇒ Object
Forcefully clear the lock.
-
#initialize(key, resources: 1, stale_client_timeout: 86400, redis: RedisLocks.redis) ⇒ Semaphore
constructor
‘resources` is the number of clients allowed to lock the semaphore concurrently.
-
#lock(timeout: nil, &block) ⇒ Object
Acquire a resource from the semaphore, if available.
- #lock!(timeout: nil, &block) ⇒ Object
-
#unlock(token = @tokens.pop) ⇒ Object
(also: #signal)
Release a resource back to the semaphore.
- #wait(timeout: 0, &block) ⇒ Object
- #wait!(timeout: 0, &block) ⇒ Object
Constructor Details
#initialize(key, resources: 1, stale_client_timeout: 86400, redis: RedisLocks.redis) ⇒ Semaphore
‘resources` is the number of clients allowed to lock the semaphore concurrently.
‘stale_client_timeout` is the threshold of time before we assume that something has gone terribly wrong with a client and we invalidate its lock.
65 66 67 68 69 70 71 72 73 74 75 |
# File 'lib/redis_locks/semaphore.rb', line 65 def initialize(key, resources: 1, stale_client_timeout: 86400, redis: RedisLocks.redis) @key = key @resource_count = resources.to_i @stale_client_timeout = stale_client_timeout.to_f @redis = Connections.ensure_pool(redis) @tokens = [] raise ArgumentError.new("Lock key is required") if @key.nil? || @key.empty? raise ArgumentError.new("resources must be > 0") unless @resource_count > 0 raise ArgumentError.new("stale_client_timeout must be > 0") unless @stale_client_timeout > 0 end |
Instance Method Details
#delete! ⇒ Object
Forcefully clear the lock. Be careful!
78 79 80 81 82 83 84 85 |
# File 'lib/redis_locks/semaphore.rb', line 78 def delete! @redis.with do |conn| conn.del(available_key) conn.del(grabbed_key) end @tokens = [] end |
#lock(timeout: nil, &block) ⇒ Object
Acquire a resource from the semaphore, if available. Returns false if no resources are available.
‘timeout` is how long to wait, blocking, until a resource is available. The default is nil, meaning don’t block. A timeout of zero means block forever. (This is a bit weird, but corresponds to how blpop uses timeout values.)
If passed a block, if a resource is available, runs the block and then unlocks.
If called without a block, if a resource is available, returns a token. Caller is then responsible for unlocking the token.
This isn’t atomic–if the process dies, we could remove something from the available queue without adding it to the grabbed set–but that’s ok, the semaphore will recover just as if this was a stale client that left its token in the grabbed set forever.
104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 |
# File 'lib/redis_locks/semaphore.rb', line 104 def lock(timeout: nil, &block) ensure_exists_and_release_stale_locks! success = @redis.with do |conn| if timeout !conn.blpop(available_key, timeout.to_i).nil? else !conn.lpop(available_key).nil? end end return false unless success token = SecureRandom.hex(16) @tokens.push(token) @redis.with do |conn| conn.zadd(grabbed_key, epoch_f(conn), token) end return_or_yield(token, &block) end |
#lock!(timeout: nil, &block) ⇒ Object
130 131 132 133 134 |
# File 'lib/redis_locks/semaphore.rb', line 130 def lock!(timeout: nil, &block) token = lock(timeout: timeout) raise SemaphoreUnavailable.new(@key, @resource_count) unless token return_or_yield(token, &block) end |
#unlock(token = @tokens.pop) ⇒ Object Also known as: signal
Release a resource back to the semaphore. Should normally be called with an explicit token.
This isn’t atomic–if the process dies, we could remove something from the blocked set without adding it to the available queue–but that’s ok, the semaphore will recover just as if this was a stale client that left its token in the grabbed set forever.
147 148 149 150 151 152 153 154 155 156 157 158 159 160 |
# File 'lib/redis_locks/semaphore.rb', line 147 def unlock(token = @tokens.pop) return unless token removed = false @redis.with do |conn| removed = conn.zrem grabbed_key, token if removed conn.lpush available_key, 1 end end removed end |
#wait(timeout: 0, &block) ⇒ Object
126 127 128 |
# File 'lib/redis_locks/semaphore.rb', line 126 def wait(timeout: 0, &block) lock(timeout: timeout, &block) end |
#wait!(timeout: 0, &block) ⇒ Object
136 137 138 |
# File 'lib/redis_locks/semaphore.rb', line 136 def wait!(timeout: 0, &block) lock!(timeout: timeout, &block) end |