Module: ScoutSemaphore
- Defined in:
- lib/scout/semaphore.rb
Defined Under Namespace
Classes: SemaphoreInterrupted
Constant Summary
collapse
- SEM_MUTEX =
Mutex.new
- RETRIABLE_ERRNOS =
[
Errno::ENOENT,
Errno::EIDRM,
Errno::EAGAIN,
Errno::EMFILE,
Errno::ENFILE,
Errno::EINTR
].map { |c| c.new.errno }
- FATAL_ERRNOS =
[
Errno::EINVAL,
Errno::EACCES
].map { |c| c.new.errno }
Class Method Summary
collapse
-
.ensure_semaphore_name(file) ⇒ Object
-
.fork_each_on_semaphore(elems, size, file = nil) ⇒ Object
-
.safe_create_semaphore(name, value, **opts) ⇒ Object
Safe wrappers that raise SystemCallError on final failure.
-
.safe_delete_semaphore(name, **opts) ⇒ Object
-
.safe_post_semaphore(name, **opts) ⇒ Object
-
.safe_wait_semaphore(name, **opts) ⇒ Object
-
.synchronize(sem) ⇒ Object
-
.thread_each_on_semaphore(elems, size) ⇒ Object
-
.with_retry(max_attempts: 6, base_delay: 0.01, max_delay: 1.0, jitter: 0.5, retriable: RETRIABLE_ERRNOS) ⇒ Object
Generic retry wrapper with exponential backoff + jitter.
-
.with_semaphore(size, file = nil) ⇒ Object
Class Method Details
.ensure_semaphore_name(file) ⇒ Object
107
108
109
110
111
112
113
114
|
# File 'lib/scout/semaphore.rb', line 107
def self.ensure_semaphore_name(file)
s = file.to_s.dup
s.gsub!(%r{^/+}, '')
s = '/' + s.gsub('/', '_')
s
end
|
.fork_each_on_semaphore(elems, size, file = nil) ⇒ Object
252
253
254
255
256
257
258
259
260
261
262
263
264
|
# File 'lib/scout/semaphore.rb', line 252
def self.fork_each_on_semaphore(elems, size, file = nil)
TSV.traverse elems, :cpus => size, :bar => "Fork each on semaphore: #{ Misc.fingerprint elems }", :into => Set.new do |elem|
elems.annotate elem if elems.respond_to? :annotate
begin
yield elem
rescue Interrupt
Log.warn "Process #{Process.pid} was aborted"
end
nil
end
nil
end
|
.safe_create_semaphore(name, value, **opts) ⇒ Object
Safe wrappers that raise SystemCallError on final failure
158
159
160
161
162
163
164
|
# File 'lib/scout/semaphore.rb', line 158
def self.safe_create_semaphore(name, value, **opts)
ret = with_retry(**opts) { ScoutSemaphore.create_semaphore(name, value) }
if ret < 0
raise SystemCallError.new("create_semaphore(#{name}) failed", -ret)
end
ret
end
|
.safe_delete_semaphore(name, **opts) ⇒ Object
166
167
168
169
170
171
172
|
# File 'lib/scout/semaphore.rb', line 166
def self.safe_delete_semaphore(name, **opts)
ret = with_retry(**opts) { ScoutSemaphore.delete_semaphore(name) }
if ret < 0
raise SystemCallError.new("delete_semaphore(#{name}) failed", -ret)
end
ret
end
|
.safe_post_semaphore(name, **opts) ⇒ Object
187
188
189
190
191
192
193
|
# File 'lib/scout/semaphore.rb', line 187
def self.safe_post_semaphore(name, **opts)
ret = with_retry(**opts) { ScoutSemaphore.post_semaphore(name) }
if ret < 0
raise SystemCallError.new("post_semaphore(#{name}) failed", -ret)
end
ret
end
|
.safe_wait_semaphore(name, **opts) ⇒ Object
174
175
176
177
178
179
180
181
182
183
184
185
|
# File 'lib/scout/semaphore.rb', line 174
def self.safe_wait_semaphore(name, **opts)
ret = with_retry(**opts) { ScoutSemaphore.wait_semaphore(name) }
if ret < 0
err = -ret
if err == Errno::EINTR.new.errno
raise SemaphoreInterrupted
else
raise SystemCallError.new("wait_semaphore(#{name}) failed", err)
end
end
ret
end
|
.synchronize(sem) ⇒ Object
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
|
# File 'lib/scout/semaphore.rb', line 195
def self.synchronize(sem)
sem = ensure_semaphore_name(sem)
begin
ScoutSemaphore.safe_wait_semaphore(sem)
rescue SemaphoreInterrupted
raise
rescue SystemCallError => e
raise
end
begin
yield
ensure
begin
ScoutSemaphore.safe_post_semaphore(sem)
rescue SystemCallError => e
raise e
end
end
end
|
.thread_each_on_semaphore(elems, size) ⇒ Object
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
|
# File 'lib/scout/semaphore.rb', line 266
def self.thread_each_on_semaphore(elems, size)
mutex = Mutex.new
count = 0
cv = ConditionVariable.new
wait_mutex = Mutex.new
begin
threads = []
wait_mutex.synchronize do
threads = elems.collect do |elem|
Thread.new(elem) do |elem|
continue = false
mutex.synchronize do
while not continue do
if count < size
continue = true
count += 1
end
mutex.sleep 1 unless continue
end
end
begin
yield elem
rescue Interrupt
Log.error "Thread was aborted while processing: #{Misc.fingerprint elem}"
raise $!
ensure
mutex.synchronize do
count -= 1
cv.signal if mutex.locked?
end
end
end
end
end
threads.each do |thread|
thread.join
end
rescue Exception
Log.exception $!
Log.info "Ensuring threads are dead: #{threads.length}"
threads.each do |thread| thread.kill end
end
end
|
.with_retry(max_attempts: 6, base_delay: 0.01, max_delay: 1.0, jitter: 0.5, retriable: RETRIABLE_ERRNOS) ⇒ Object
Generic retry wrapper with exponential backoff + jitter
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
|
# File 'lib/scout/semaphore.rb', line 132
def self.with_retry(max_attempts: 6, base_delay: 0.01, max_delay: 1.0, jitter: 0.5, retriable: RETRIABLE_ERRNOS)
attempts = 0
while true
attempts += 1
ret = yield
return ret if ret >= 0
err = -ret
if FATAL_ERRNOS.include?(err) || attempts >= max_attempts || !retriable.include?(err)
return ret
end
base = base_delay * (2 ** (attempts - 1))
sleep_time = [base, max_delay].min
sleep_time += rand * jitter * sleep_time
Log.warn "Semaphore operation failed (errno=#{err}), retrying in #{'%.3f' % sleep_time}s (attempt #{attempts}/#{max_attempts})"
sleep(sleep_time)
end
end
|
.with_semaphore(size, file = nil) ⇒ Object
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
|
# File 'lib/scout/semaphore.rb', line 224
def self.with_semaphore(size, file = nil)
if file.nil?
file = "/scout-" + Misc.digest(rand(100000000000).to_s)[0..10]
else
file = ensure_semaphore_name(file)
end
begin
Log.debug "Creating semaphore (#{ size }): #{file}"
begin
ScoutSemaphore.safe_create_semaphore(file, size)
rescue SystemCallError => e
Log.error "Failed to create semaphore #{file}: #{e.message}"
raise
end
yield file
ensure
Log.debug "Removing semaphore #{ file }"
begin
ScoutSemaphore.safe_delete_semaphore(file)
rescue SystemCallError => e
Log.warn "delete_semaphore(#{file}) failed: #{e.message}"
end
end
end
|