Class: Redis::Semaphore

Inherits:
Object
  • Object
show all
Defined in:
lib/redis/semaphore.rb

Constant Summary collapse

EXISTS_TOKEN =
"1"
API_VERSION =
"1"

Instance Method Summary collapse

Constructor Details

#initialize(name, opts = {}) ⇒ Semaphore

stale_client_timeout is the threshold of time before we assume that something has gone terribly wrong with a client and we invalidate it’s lock. Default is nil for which we don’t check for stale clients Redis::Semaphore.new(:my_semaphore, :stale_client_timeout => 30, :redis => myRedis) Redis::Semaphore.new(:my_semaphore, :redis => myRedis) Redis::Semaphore.new(:my_semaphore, :resources => 1, :redis => myRedis) Redis::Semaphore.new(:my_semaphore, :host => “”, :port => “”) Redis::Semaphore.new(:my_semaphore, :path => “bla”)



17
18
19
20
21
22
23
24
25
# File 'lib/redis/semaphore.rb', line 17

def initialize(name, opts = {})
  @name = name
  @expiration = opts.delete(:expiration)
  @resource_count = opts.delete(:resources) || 1
  @stale_client_timeout = opts.delete(:stale_client_timeout)
  @redis = opts.delete(:redis) || Redis.new(opts)
  @use_local_time = opts.delete(:use_local_time)
  @tokens = []
end

Instance Method Details

#all_tokensObject



121
122
123
124
125
126
# File 'lib/redis/semaphore.rb', line 121

def all_tokens
  @redis.multi do
    @redis.lrange(available_key, 0, -1)
    @redis.hkeys(grabbed_key)
  end.flatten
end

#available_countObject



44
45
46
47
48
49
50
# File 'lib/redis/semaphore.rb', line 44

def available_count
  if exists?
    @redis.llen(available_key)
  else
    @resource_count
  end
end

#delete!Object



52
53
54
55
56
57
# File 'lib/redis/semaphore.rb', line 52

def delete!
  @redis.del(available_key)
  @redis.del(grabbed_key)
  @redis.del(exists_key)
  @redis.del(version_key)
end

#exists?Boolean

Returns:

  • (Boolean)


117
118
119
# File 'lib/redis/semaphore.rb', line 117

def exists?
  @redis.exists(exists_key)
end

#exists_or_create!Object



27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/redis/semaphore.rb', line 27

def exists_or_create!
  token = @redis.getset(exists_key, EXISTS_TOKEN)

  if token.nil?
    create!
  else
    # Previous versions of redis-semaphore did not set `version_key`.
    # Make sure it's set now, so we can use it in future versions.

    if token == API_VERSION && @redis.get(version_key).nil?
      @redis.set(version_key, API_VERSION)
    end

    true
  end
end

#generate_unique_tokenObject



128
129
130
131
132
133
134
135
# File 'lib/redis/semaphore.rb', line 128

def generate_unique_token
  tokens = all_tokens
  token = Random.rand.to_s

  while(tokens.include? token)
    token = Random.rand.to_s
  end
end

#lock(timeout = nil) ⇒ Object Also known as: wait



59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/redis/semaphore.rb', line 59

def lock(timeout = nil)
  exists_or_create!
  release_stale_locks! if check_staleness?

  if timeout.nil? || timeout > 0
    # passing timeout 0 to blpop causes it to block
    token_pair = @redis.blpop(available_key, timeout || 0)
  else
    token_pair = @redis.lpop(available_key)
  end

  return false if token_pair.nil?

  current_token = token_pair[1]
  @tokens.push(current_token)
  @redis.hset(grabbed_key, current_token, current_time.to_f)
  return_value = current_token

  if block_given?
    begin
      return_value = yield current_token
    ensure
      signal(current_token)
    end
  end

  return_value
end

#locked?(token = nil) ⇒ Boolean

Returns:

  • (Boolean)


94
95
96
97
98
99
100
101
102
103
104
# File 'lib/redis/semaphore.rb', line 94

def locked?(token = nil)
  if token
    @redis.hexists(grabbed_key, token)
  else
    @tokens.each do |token|
      return true if locked?(token)
    end

    false
  end
end

#release_stale_locks!Object



137
138
139
140
141
142
143
144
145
146
147
# File 'lib/redis/semaphore.rb', line 137

def release_stale_locks!
  simple_mutex(:release_locks, 10) do
    @redis.hgetall(grabbed_key).each do |token, locked_at|
      timed_out_at = locked_at.to_f + @stale_client_timeout

      if timed_out_at < current_time.to_f
        signal(token)
      end
    end
  end
end

#signal(token = 1) ⇒ Object



106
107
108
109
110
111
112
113
114
115
# File 'lib/redis/semaphore.rb', line 106

def signal(token = 1)
  token ||= generate_unique_token

  @redis.multi do
    @redis.hdel grabbed_key, token
    @redis.lpush available_key, token

    set_expiration_if_necessary
  end
end

#unlockObject



89
90
91
92
# File 'lib/redis/semaphore.rb', line 89

def unlock
  return false unless locked?
  signal(@tokens.pop)[1]
end