Class: RedisLocks::Semaphore

Inherits:
Object
  • Object
show all
Defined in:
lib/redis_locks/semaphore.rb

Constant Summary collapse

NAMESPACE =
"semaphore-lua"
SETUP_SCRIPT =

Removes stale locks, then ensures that all resources which aren’t locked are marked as available.

<<-LUA
  local avail_key = KEYS[1]
  local grabbed_key = KEYS[2]

  local expected_resources = tonumber(ARGV[1])
  local stale_before = tonumber(ARGV[2])

  redis.call('zremrangebyscore', grabbed_key, -1, stale_before)

  local found_resources = redis.call('llen', avail_key) + redis.call('zcard', grabbed_key)
  if found_resources < expected_resources then
    for i=1,(expected_resources - found_resources) do
      redis.call('rpush', avail_key, 1)
    end
  end
LUA
SETUP_DIGEST =
Digest::SHA1.hexdigest(SETUP_SCRIPT)

Instance Method Summary collapse

Constructor Details

#initialize(key, resources: 1, stale_client_timeout: 86400, redis: RedisLocks.redis) ⇒ Semaphore

‘resources` is the number of clients allowed to lock the semaphore concurrently.

‘stale_client_timeout` is the threshold of time before we assume that something has gone terribly wrong with a client and we invalidate its lock.

Raises:

  • (ArgumentError)


65
66
67
68
69
70
71
72
73
74
75
# File 'lib/redis_locks/semaphore.rb', line 65

def initialize(key, resources: 1, stale_client_timeout: 86400, redis: RedisLocks.redis)
  @key = key
  @resource_count = resources.to_i
  @stale_client_timeout = stale_client_timeout.to_f
  @redis = Connections.ensure_pool(redis)
  @tokens = []

  raise ArgumentError.new("Lock key is required") if @key.nil? || @key.empty?
  raise ArgumentError.new("resources must be > 0") unless @resource_count > 0
  raise ArgumentError.new("stale_client_timeout must be > 0") unless @stale_client_timeout > 0
end

Instance Method Details

#delete!Object

Forcefully clear the lock. Be careful!



78
79
80
81
82
83
84
85
# File 'lib/redis_locks/semaphore.rb', line 78

def delete!
  @redis.with do |conn|
    conn.del(available_key)
    conn.del(grabbed_key)
  end

  @tokens = []
end

#lock(timeout: nil, &block) ⇒ Object

Acquire a resource from the semaphore, if available. Returns false if no resources are available.

‘timeout` is how long to wait, blocking, until a resource is available. The default is nil, meaning don’t block. A timeout of zero means block forever. (This is a bit weird, but corresponds to how blpop uses timeout values.)

If passed a block, if a resource is available, runs the block and then unlocks.

If called without a block, if a resource is available, returns a token. Caller is then responsible for unlocking the token.

This isn’t atomic–if the process dies, we could remove something from the available queue without adding it to the grabbed set–but that’s ok, the semaphore will recover just as if this was a stale client that left its token in the grabbed set forever.



104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
# File 'lib/redis_locks/semaphore.rb', line 104

def lock(timeout: nil, &block)
  ensure_exists_and_release_stale_locks!

  success = @redis.with do |conn|
    if timeout
      !conn.blpop(available_key, timeout.to_i).nil?
    else
      !conn.lpop(available_key).nil?
    end
  end

  return false unless success

  token = SecureRandom.hex(16)
  @tokens.push(token)
  @redis.with do |conn|
    conn.zadd(grabbed_key, epoch_f(conn), token)
  end

  return_or_yield(token, &block)
end

#lock!(timeout: nil, &block) ⇒ Object



130
131
132
133
134
# File 'lib/redis_locks/semaphore.rb', line 130

def lock!(timeout: nil, &block)
  token = lock(timeout: timeout)
  raise SemaphoreUnavailable.new(@key, @resource_count) unless token
  return_or_yield(token, &block)
end

#unlock(token = @tokens.pop) ⇒ Object Also known as: signal

Release a resource back to the semaphore. Should normally be called with an explicit token.

This isn’t atomic–if the process dies, we could remove something from the blocked set without adding it to the available queue–but that’s ok, the semaphore will recover just as if this was a stale client that left its token in the grabbed set forever.



147
148
149
150
151
152
153
154
155
156
157
158
159
160
# File 'lib/redis_locks/semaphore.rb', line 147

def unlock(token = @tokens.pop)
  return unless token

  removed = false

  @redis.with do |conn|
    removed = conn.zrem grabbed_key, token
    if removed
      conn.lpush available_key, 1
    end
  end

  removed
end

#wait(timeout: 0, &block) ⇒ Object



126
127
128
# File 'lib/redis_locks/semaphore.rb', line 126

def wait(timeout: 0, &block)
  lock(timeout: timeout, &block)
end

#wait!(timeout: 0, &block) ⇒ Object



136
137
138
# File 'lib/redis_locks/semaphore.rb', line 136

def wait!(timeout: 0, &block)
  lock!(timeout: timeout, &block)
end