Class: VCAP::Services::Base::AsyncJob::Lock
Instance Attribute Summary collapse
Instance Method Summary
collapse
Methods included from Error
#failure, #internal_fail, #parse_msg, #success, #timeout_fail
Constructor Details
#initialize(name, opts = {}) ⇒ Lock
Options for lock name - The uuid of the lock timeout - The time that waits to acquire the lock, default 20 seconds expiration - Lock expires in given seconds if not refreshed, default 10 seconds logger - The logger.. ttl - The max time that a thread can acquire the lock, default 600 seconds. Lock raise JOB_TIMEOUT error once the ttl is exceeded.
21
22
23
24
25
26
27
28
29
30
31
|
# File 'lib/base/job/lock.rb', line 21
def initialize(name, opts={})
@name = name
@timeout = opts[:timeout] || 20 @expiration = opts[:expiration] || 10 @ttl = opts[:ttl] || 600 @logger = opts[:logger] || make_logger
config = Config.redis_config
raise "Can't find configuration of redis." unless config
@redis = ::Redis.new(config)
@released_thread = {}
end
|
Instance Attribute Details
#expiration ⇒ Object
Returns the value of attribute expiration.
12
13
14
|
# File 'lib/base/job/lock.rb', line 12
def expiration
@expiration
end
|
Returns the value of attribute name.
12
13
14
|
# File 'lib/base/job/lock.rb', line 12
def name
@name
end
|
Returns the value of attribute timeout.
12
13
14
|
# File 'lib/base/job/lock.rb', line 12
def timeout
@timeout
end
|
Instance Method Details
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
|
# File 'lib/base/job/lock.rb', line 137
def delete
@logger.debug("Deleting lock: #{@name}")
existing_lock = @redis.get(@name)
@logger.debug("Lock #{@name} is acquired by others.")if existing_lock.to_f > @lock_expiration
@redis.watch(@name)
res = @redis.multi do
@redis.del(@name)
end
if res
@logger.debug("Lock #{@name} is deleted.")
else
@logger.debug("Lock #{@name} is acquired by others.")
end
true
end
|
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
|
# File 'lib/base/job/lock.rb', line 39
def lock
@logger.debug("Acquiring lock: #{@name}")
started = Time.now.to_f
expiration = started.to_f + @expiration + 1
until @redis.setnx(@name, expiration)
existing_lock = @redis.get(@name)
if existing_lock.to_f < Time.now.to_f
@logger.debug("Lock #{@name} is expired, trying to acquire it.")
break if watch_and_update(@redis, expiration)
end
raise ServiceError.new(ServiceError::JOB_QUEUE_TIMEOUT, @timeout)if Time.now.to_f - started > @timeout
sleep(1)
expiration = Time.now.to_f + @expiration + 1
end
@lock_expiration = expiration
refresh_thread = setup_refresh_thread
@logger.debug("Lock #{@name} is acquired, will expire at #{@lock_expiration}")
begin
Timeout::timeout(@ttl) do
yield if block_given?
end
rescue Timeout::Error =>e
raise ServiceError.new(ServiceError::JOB_TIMEOUT, @ttl)
ensure
release_thread(refresh_thread)
delete
end
end
|
#make_logger ⇒ Object
33
34
35
36
37
|
# File 'lib/base/job/lock.rb', line 33
def make_logger
logger = Logger.new(STDOUT)
logger.level = Logger::ERROR
logger
end
|
#release_thread(t) ⇒ Object
86
87
88
89
90
91
92
93
94
95
96
97
|
# File 'lib/base/job/lock.rb', line 86
def release_thread t
@released_thread[t.object_id] = true
waited = 0
while (waited += 1) <= 5
return unless t.status
sleep 1
end
t.exit
end
|
#released? ⇒ Boolean
99
100
101
|
# File 'lib/base/job/lock.rb', line 99
def released?
@released_thread[Thread.current.object_id]
end
|
#setup_refresh_thread ⇒ Object
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
|
# File 'lib/base/job/lock.rb', line 103
def setup_refresh_thread
t = Thread.new do
redis = ::Redis.new(Config.redis_config)
sleep_interval = [1.0, @expiration/2].max.to_i
begin
while not released? do
@logger.debug("Renewing lock #{@name}")
redis.watch(@name)
existing_lock = redis.get(@name)
break if existing_lock.to_f > @lock_expiration expiration = Time.now.to_f + @expiration + 1
break unless watch_and_update(redis, expiration)
@lock_expiration = expiration
sleep_interval.times do
sleep 1
break if released?
end
end
rescue => e
@logger.error("Can't renew lock #{@name}, #{e}")
ensure
begin
@logger.debug("Lock renew thread for #{@name} exited.")
redis.quit
rescue => e
@logger.debug("Ignore error when quit: #{e}")
end
end
end
t
end
|
#watch_and_update(redis, expiration) ⇒ Object
73
74
75
76
77
78
79
80
81
82
83
84
|
# File 'lib/base/job/lock.rb', line 73
def watch_and_update(redis, expiration)
redis.watch(@name)
res = redis.multi do
redis.set(@name, expiration)
end
if res
@logger.debug("Lock #{@name} is renewed and acquired.")
else
@logger.debug("Lock #{@name} was updated by others.")
end
res
end
|