Module: ScoutSemaphore

Defined in:
lib/scout/semaphore.rb

Defined Under Namespace

Classes: SemaphoreInterrupted

Constant Summary collapse

SEM_MUTEX =
Mutex.new
RETRIABLE_ERRNOS =

Errno numeric lists

[
  Errno::ENOENT,
  Errno::EIDRM,
  Errno::EAGAIN,
  Errno::EMFILE,
  Errno::ENFILE,
  Errno::EINTR
].map { |c| c.new.errno }
FATAL_ERRNOS =
[
  Errno::EINVAL,
  Errno::EACCES
].map { |c| c.new.errno }

Class Method Summary collapse

Class Method Details

.create_semaphore(name, value, **opts) ⇒ Object

Safe wrappers that raise SystemCallError on final failure

Raises:

  • (SystemCallError)


163
164
165
166
167
168
169
170
# File 'lib/scout/semaphore.rb', line 163

def self.create_semaphore(name, value, **opts)
  ret = with_retry(**opts) { ScoutSemaphore.create_semaphore_c(name, value) }
  raise SystemCallError.new("Semaphore missing (#{name})") unless self.exists?(name)
  if ret < 0
    raise SystemCallError.new("create_semaphore(#{name}) failed", -ret)
  end
  ret
end

.delete_semaphore(name, **opts) ⇒ Object



172
173
174
175
176
177
178
# File 'lib/scout/semaphore.rb', line 172

def self.delete_semaphore(name, **opts)
  ret = with_retry(**opts) { ScoutSemaphore.delete_semaphore_c(name) }
  if ret < 0
    raise SystemCallError.new("delete_semaphore(#{name}) failed", -ret)
  end
  ret
end

.ensure_semaphore_name(file) ⇒ Object



107
108
109
110
111
112
113
114
# File 'lib/scout/semaphore.rb', line 107

def self.ensure_semaphore_name(file)
  # Ensure a valid POSIX named semaphore name: must start with '/'
  s = file.to_s.dup
  # strip leading slashes and replace other slashes with underscores, then prepend single '/'
  s.gsub!(%r{^/+}, '')
  s = '/' + s.gsub('/', '_')
  s
end

.exists?(name) ⇒ Boolean

Returns:

  • (Boolean)


116
117
118
119
# File 'lib/scout/semaphore.rb', line 116

def self.exists?(name)
  file = File.join('/dev/shm', 'sem.' + name[1..-1])
  Open.exists? file
end

.fork_each_on_semaphore(elems, size, file = nil) ⇒ Object



260
261
262
263
264
265
266
267
268
269
270
271
272
# File 'lib/scout/semaphore.rb', line 260

def self.fork_each_on_semaphore(elems, size, file = nil)

  TSV.traverse elems, :cpus => size, :bar => "Fork each on semaphore: #{ Misc.fingerprint elems }", :into => Set.new do |elem|
    elems.annotate elem if elems.respond_to? :annotate
    begin
      yield elem
    rescue Interrupt
      Log.warn "Process #{Process.pid} was aborted"
    end
    nil
  end
  nil
end

.post_semaphore(name, **opts) ⇒ Object

Raises:

  • (SystemCallError)


194
195
196
197
198
199
200
201
# File 'lib/scout/semaphore.rb', line 194

def self.post_semaphore(name, **opts)
  raise SystemCallError.new("Semaphore missing (#{name})") unless self.exists?(name)
  ret = with_retry(**opts) { ScoutSemaphore.post_semaphore_c(name) }
  if ret < 0
    raise SystemCallError.new("post_semaphore(#{name}) failed", -ret)
  end
  ret
end

.synchronize(sem) ⇒ Object



203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
# File 'lib/scout/semaphore.rb', line 203

def self.synchronize(sem)
  # Ensure name is normalized (caller should pass normalized name, but be safe)
  sem = ensure_semaphore_name(sem)

  # wait_semaphore returns 0 on success or -errno on error
  begin
    ScoutSemaphore.wait_semaphore(sem)
  rescue SemaphoreInterrupted
    raise
  rescue SystemCallError => e
    # bubble up for callers to handle
    raise
  end

  begin
    yield
  ensure
    begin
      ScoutSemaphore.post_semaphore(sem)
    rescue SystemCallError => e
      # Log but don't raise from ensure
      # Log.warn "post_semaphore(#{sem}) failed in ensure: #{e.message}"

      # Actually, do raise
      raise e
    end
  end
end

.thread_each_on_semaphore(elems, size) ⇒ Object



274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
# File 'lib/scout/semaphore.rb', line 274

def self.thread_each_on_semaphore(elems, size)
  mutex = Mutex.new
  count = 0
  cv = ConditionVariable.new
  wait_mutex = Mutex.new

  begin

    threads = []
    wait_mutex.synchronize do
      threads = elems.collect do |elem|
        Thread.new(elem) do |elem|

          continue = false
          mutex.synchronize do
            while not continue do
              if count < size
                continue = true
                count += 1
              end
              # wait briefly to avoid busy loop; ConditionVariable could be used here properly
              mutex.sleep 1 unless continue
            end
          end

          begin
            yield elem
          rescue Interrupt
            Log.error "Thread was aborted while processing: #{Misc.fingerprint elem}"
            raise $!
          ensure
            mutex.synchronize do
              count -= 1
              cv.signal if mutex.locked?
            end
          end
        end
      end
    end

    threads.each do |thread|
      thread.join
    end
  rescue Exception
    Log.exception $!
    Log.info "Ensuring threads are dead: #{threads.length}"
    threads.each do |thread| thread.kill end
  end
end

.wait_semaphore(name, **opts) ⇒ Object

Raises:

  • (SystemCallError)


180
181
182
183
184
185
186
187
188
189
190
191
192
# File 'lib/scout/semaphore.rb', line 180

def self.wait_semaphore(name, **opts)
  raise SystemCallError.new("Semaphore missing (#{name})") unless self.exists?(name)
  ret = with_retry(**opts) { ScoutSemaphore.wait_semaphore_c(name) }
  if ret < 0
    err = -ret
    if err == Errno::EINTR.new.errno
      raise SemaphoreInterrupted
    else
      raise SystemCallError.new("wait_semaphore(#{name}) failed", err)
    end
  end
  ret
end

.with_retry(max_attempts: 6, base_delay: 0.01, max_delay: 1.0, jitter: 0.5, retriable: RETRIABLE_ERRNOS) ⇒ Object

Generic retry wrapper with exponential backoff + jitter



137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
# File 'lib/scout/semaphore.rb', line 137

def self.with_retry(max_attempts: 6, base_delay: 0.01, max_delay: 1.0, jitter: 0.5, retriable: RETRIABLE_ERRNOS)
  attempts = 0
  while true
    attempts += 1
    ret = yield
    # caller expects 0 on success, negative errno on failure
    return ret if ret >= 0

    err = -ret
    # don't retry if it's clearly fatal or not in retriable list
    if FATAL_ERRNOS.include?(err) || attempts >= max_attempts || !retriable.include?(err)
      return ret
    end

    # exponential backoff with jitter
    base = base_delay * (2 ** (attempts - 1))
    sleep_time = [base, max_delay].min
    # add jitter in range [0, jitter * sleep_time)
    sleep_time += rand * jitter * sleep_time

    Log.warn "Semaphore operation failed (errno=#{err}), retrying in #{'%.3f' % sleep_time}s (attempt #{attempts}/#{max_attempts})"
    sleep(sleep_time)
  end
end

.with_semaphore(size, file = nil) ⇒ Object



232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
# File 'lib/scout/semaphore.rb', line 232

def self.with_semaphore(size, file = nil)
  if file.nil?
    file = "/scout-" + Misc.digest(rand(100000000000).to_s)[0..10]
  else
    # ensure valid POSIX name
    file = ensure_semaphore_name(file)
  end

  begin
    Log.debug "Creating semaphore (#{ size }): #{file}"
    begin
      ScoutSemaphore.create_semaphore(file, size)
    rescue SystemCallError => e
      Log.error "Failed to create semaphore #{file}: #{e.message}"
      raise
    end

    yield file
  ensure
    Log.debug "Removing semaphore #{ file }"
    begin
      ScoutSemaphore.delete_semaphore(file)
    rescue SystemCallError => e
      Log.warn "delete_semaphore(#{file}) failed: #{e.message}"
    end
  end
end