Module: ScoutSemaphore
- Defined in:
- lib/scout/semaphore.rb
Defined Under Namespace
Classes: SemaphoreInterrupted
Constant Summary
collapse
- SEM_MUTEX =
Mutex.new
- RETRIABLE_ERRNOS =
[
Errno::ENOENT,
Errno::EIDRM,
Errno::EAGAIN,
Errno::EMFILE,
Errno::ENFILE,
Errno::EINTR
].map { |c| c.new.errno }
- FATAL_ERRNOS =
[
Errno::EINVAL,
Errno::EACCES
].map { |c| c.new.errno }
Class Method Summary
collapse
-
.create_semaphore(name, value, **opts) ⇒ Object
Safe wrappers that raise SystemCallError on final failure.
-
.delete_semaphore(name, **opts) ⇒ Object
-
.ensure_semaphore_name(file) ⇒ Object
-
.exists?(name) ⇒ Boolean
-
.fork_each_on_semaphore(elems, size, file = nil) ⇒ Object
-
.post_semaphore(name, **opts) ⇒ Object
-
.synchronize(sem) ⇒ Object
-
.thread_each_on_semaphore(elems, size) ⇒ Object
-
.wait_semaphore(name, **opts) ⇒ Object
-
.with_retry(max_attempts: 6, base_delay: 0.01, max_delay: 1.0, jitter: 0.5, retriable: RETRIABLE_ERRNOS) ⇒ Object
Generic retry wrapper with exponential backoff + jitter.
-
.with_semaphore(size, file = nil) ⇒ Object
Class Method Details
.create_semaphore(name, value, **opts) ⇒ Object
Safe wrappers that raise SystemCallError on final failure
163
164
165
166
167
168
169
170
|
# File 'lib/scout/semaphore.rb', line 163
def self.create_semaphore(name, value, **opts)
ret = with_retry(**opts) { ScoutSemaphore.create_semaphore_c(name, value) }
raise SystemCallError.new("Semaphore missing (#{name})") unless self.exists?(name)
if ret < 0
raise SystemCallError.new("create_semaphore(#{name}) failed", -ret)
end
ret
end
|
.delete_semaphore(name, **opts) ⇒ Object
172
173
174
175
176
177
178
|
# File 'lib/scout/semaphore.rb', line 172
def self.delete_semaphore(name, **opts)
ret = with_retry(**opts) { ScoutSemaphore.delete_semaphore_c(name) }
if ret < 0
raise SystemCallError.new("delete_semaphore(#{name}) failed", -ret)
end
ret
end
|
.ensure_semaphore_name(file) ⇒ Object
107
108
109
110
111
112
113
114
|
# File 'lib/scout/semaphore.rb', line 107
def self.ensure_semaphore_name(file)
s = file.to_s.dup
s.gsub!(%r{^/+}, '')
s = '/' + s.gsub('/', '_')
s
end
|
.exists?(name) ⇒ Boolean
116
117
118
119
|
# File 'lib/scout/semaphore.rb', line 116
def self.exists?(name)
file = File.join('/dev/shm', 'sem.' + name[1..-1])
Open.exists? file
end
|
.fork_each_on_semaphore(elems, size, file = nil) ⇒ Object
260
261
262
263
264
265
266
267
268
269
270
271
272
|
# File 'lib/scout/semaphore.rb', line 260
def self.fork_each_on_semaphore(elems, size, file = nil)
TSV.traverse elems, :cpus => size, :bar => "Fork each on semaphore: #{ Misc.fingerprint elems }", :into => Set.new do |elem|
elems.annotate elem if elems.respond_to? :annotate
begin
yield elem
rescue Interrupt
Log.warn "Process #{Process.pid} was aborted"
end
nil
end
nil
end
|
.post_semaphore(name, **opts) ⇒ Object
194
195
196
197
198
199
200
201
|
# File 'lib/scout/semaphore.rb', line 194
def self.post_semaphore(name, **opts)
raise SystemCallError.new("Semaphore missing (#{name})") unless self.exists?(name)
ret = with_retry(**opts) { ScoutSemaphore.post_semaphore_c(name) }
if ret < 0
raise SystemCallError.new("post_semaphore(#{name}) failed", -ret)
end
ret
end
|
.synchronize(sem) ⇒ Object
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
|
# File 'lib/scout/semaphore.rb', line 203
def self.synchronize(sem)
sem = ensure_semaphore_name(sem)
begin
ScoutSemaphore.wait_semaphore(sem)
rescue SemaphoreInterrupted
raise
rescue SystemCallError => e
raise
end
begin
yield
ensure
begin
ScoutSemaphore.post_semaphore(sem)
rescue SystemCallError => e
raise e
end
end
end
|
.thread_each_on_semaphore(elems, size) ⇒ Object
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
|
# File 'lib/scout/semaphore.rb', line 274
def self.thread_each_on_semaphore(elems, size)
mutex = Mutex.new
count = 0
cv = ConditionVariable.new
wait_mutex = Mutex.new
begin
threads = []
wait_mutex.synchronize do
threads = elems.collect do |elem|
Thread.new(elem) do |elem|
continue = false
mutex.synchronize do
while not continue do
if count < size
continue = true
count += 1
end
mutex.sleep 1 unless continue
end
end
begin
yield elem
rescue Interrupt
Log.error "Thread was aborted while processing: #{Misc.fingerprint elem}"
raise $!
ensure
mutex.synchronize do
count -= 1
cv.signal if mutex.locked?
end
end
end
end
end
threads.each do |thread|
thread.join
end
rescue Exception
Log.exception $!
Log.info "Ensuring threads are dead: #{threads.length}"
threads.each do |thread| thread.kill end
end
end
|
.wait_semaphore(name, **opts) ⇒ Object
180
181
182
183
184
185
186
187
188
189
190
191
192
|
# File 'lib/scout/semaphore.rb', line 180
def self.wait_semaphore(name, **opts)
raise SystemCallError.new("Semaphore missing (#{name})") unless self.exists?(name)
ret = with_retry(**opts) { ScoutSemaphore.wait_semaphore_c(name) }
if ret < 0
err = -ret
if err == Errno::EINTR.new.errno
raise SemaphoreInterrupted
else
raise SystemCallError.new("wait_semaphore(#{name}) failed", err)
end
end
ret
end
|
.with_retry(max_attempts: 6, base_delay: 0.01, max_delay: 1.0, jitter: 0.5, retriable: RETRIABLE_ERRNOS) ⇒ Object
Generic retry wrapper with exponential backoff + jitter
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
|
# File 'lib/scout/semaphore.rb', line 137
def self.with_retry(max_attempts: 6, base_delay: 0.01, max_delay: 1.0, jitter: 0.5, retriable: RETRIABLE_ERRNOS)
attempts = 0
while true
attempts += 1
ret = yield
return ret if ret >= 0
err = -ret
if FATAL_ERRNOS.include?(err) || attempts >= max_attempts || !retriable.include?(err)
return ret
end
base = base_delay * (2 ** (attempts - 1))
sleep_time = [base, max_delay].min
sleep_time += rand * jitter * sleep_time
Log.warn "Semaphore operation failed (errno=#{err}), retrying in #{'%.3f' % sleep_time}s (attempt #{attempts}/#{max_attempts})"
sleep(sleep_time)
end
end
|
.with_semaphore(size, file = nil) ⇒ Object
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
|
# File 'lib/scout/semaphore.rb', line 232
def self.with_semaphore(size, file = nil)
if file.nil?
file = "/scout-" + Misc.digest(rand(100000000000).to_s)[0..10]
else
file = ensure_semaphore_name(file)
end
begin
Log.debug "Creating semaphore (#{ size }): #{file}"
begin
ScoutSemaphore.create_semaphore(file, size)
rescue SystemCallError => e
Log.error "Failed to create semaphore #{file}: #{e.message}"
raise
end
yield file
ensure
Log.debug "Removing semaphore #{ file }"
begin
ScoutSemaphore.delete_semaphore(file)
rescue SystemCallError => e
Log.warn "delete_semaphore(#{file}) failed: #{e.message}"
end
end
end
|