Class: Redis::Semaphore

Inherits:
Object
  • Object
show all
Defined in:
lib/redis/semaphore.rb

Constant Summary collapse

API_VERSION =
"1"

Instance Method Summary collapse

Constructor Details

#initialize(name, opts = {}) ⇒ Semaphore

stale_client_timeout is the threshold of time before we assume that something has gone terribly wrong with a client and we invalidate it’s lock. Default is nil for which we don’t check for stale clients Redis::Semaphore.new(:my_semaphore, :stale_client_timeout => 30, :redis => myRedis) Redis::Semaphore.new(:my_semaphore, :redis => myRedis) Redis::Semaphore.new(:my_semaphore, :resources => 1, :redis => myRedis) Redis::Semaphore.new(:my_semaphore, :connection => “”, :port => “”) Redis::Semaphore.new(:my_semaphore, :path => “bla”)



16
17
18
19
20
21
22
23
# File 'lib/redis/semaphore.rb', line 16

def initialize(name, opts = {})
  @name = name
  @resource_count = opts.delete(:resources) || 1
  @stale_client_timeout = opts.delete(:stale_client_timeout)
  @redis = opts.delete(:redis) || Redis.new(opts)
  @use_local_time = opts.delete(:use_local_time)
  @tokens = []
end

Instance Method Details

#all_tokensObject



101
102
103
104
105
106
# File 'lib/redis/semaphore.rb', line 101

def all_tokens
  @redis.multi do
    @redis.lrange(available_key, 0, -1)
    @redis.hkeys(grabbed_key)
  end.flatten
end

#available_countObject



37
38
39
# File 'lib/redis/semaphore.rb', line 37

def available_count
  @redis.llen(available_key)
end

#delete!Object



41
42
43
44
45
# File 'lib/redis/semaphore.rb', line 41

def delete!
  @redis.del(available_key)
  @redis.del(grabbed_key)
  @redis.del(exists_key)
end

#exists?Boolean

Returns:

  • (Boolean)


97
98
99
# File 'lib/redis/semaphore.rb', line 97

def exists?
  @redis.exists(exists_key)
end

#exists_or_create!Object



25
26
27
28
29
30
31
32
33
34
35
# File 'lib/redis/semaphore.rb', line 25

def exists_or_create!
  token = @redis.getset(exists_key, API_VERSION)

  if token.nil?
    create!
  elsif token != API_VERSION
    raise "Semaphore exists but running as wrong version (version #{token} vs #{API_VERSION})."
  else
    true
  end
end

#generate_unique_tokenObject



108
109
110
111
112
113
114
115
# File 'lib/redis/semaphore.rb', line 108

def generate_unique_token
  tokens = all_tokens
  token = Random.rand.to_s

  while(tokens.include? token)
    token = Random.rand.to_s
  end
end

#lock(timeout = 0) ⇒ Object Also known as: wait



47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/redis/semaphore.rb', line 47

def lock(timeout = 0)
  exists_or_create!
  release_stale_locks! if check_staleness?

  token_pair = @redis.blpop(available_key, timeout)
  return false if token_pair.nil?

  current_token = token_pair[1]
  @tokens.push(current_token)
  @redis.hset(grabbed_key, current_token, current_time.to_f)
  return_value = current_token

  if block_given?
    begin
      return_value = yield current_token
    ensure
      signal(current_token)
    end
  end

  return_value
end

#locked?(token = nil) ⇒ Boolean

Returns:

  • (Boolean)


76
77
78
79
80
81
82
83
84
85
86
# File 'lib/redis/semaphore.rb', line 76

def locked?(token = nil)
  if token
    @redis.hexists(grabbed_key, token)
  else
    @tokens.each do |token|
      return true if locked?(token)
    end
    
    false
  end
end

#release_stale_locks!Object



117
118
119
120
121
122
123
124
125
126
127
# File 'lib/redis/semaphore.rb', line 117

def release_stale_locks!
  simple_mutex(:release_locks, 10) do
    @redis.hgetall(grabbed_key).each do |token, locked_at|
      timed_out_at = locked_at.to_f + @stale_client_timeout

      if timed_out_at < current_time.to_f
        signal(token)
      end
    end
  end
end

#signal(token = 1) ⇒ Object



88
89
90
91
92
93
94
95
# File 'lib/redis/semaphore.rb', line 88

def signal(token = 1)
  token ||= generate_unique_token

  @redis.multi do
    @redis.hdel grabbed_key, token
    @redis.lpush available_key, token
  end
end

#unlockObject



71
72
73
74
# File 'lib/redis/semaphore.rb', line 71

def unlock
  return false unless locked?
  signal(@tokens.pop)[1]
end