Class: Async::Semaphore

Inherits:
Object
  • Object
show all
Defined in:
lib/async/semaphore.rb

Overview

A semaphore is used to control access to a common resource in a concurrent system. A useful way to think of a semaphore as used in the real-world systems is as a record of how many units of a particular resource are available, coupled with operations to adjust that record safely (i.e. to avoid race conditions) as units are required or become free, and, if necessary, wait until a unit of the resource becomes available.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(limit = 1) ⇒ Semaphore

Returns a new instance of Semaphore.



24
25
26
27
28
# File 'lib/async/semaphore.rb', line 24

def initialize(limit = 1)
  @count = 0
  @limit = limit
  @waiting = []
end

Instance Attribute Details

#countObject (readonly)

The current number of tasks that have acquired the semaphore.



31
32
33
# File 'lib/async/semaphore.rb', line 31

def count
  @count
end

#limitObject (readonly)

The maximum number of tasks that can acquire the semaphore.



34
35
36
# File 'lib/async/semaphore.rb', line 34

def limit
  @limit
end

#waitingObject (readonly)

The tasks waiting on this semaphore.



37
38
39
# File 'lib/async/semaphore.rb', line 37

def waiting
  @waiting
end

Instance Method Details

#acquire { ... } ⇒ Object

Acquire the semaphore, block if we are at the limit. If no block is provided, you must call release manually.

Yields:

  • when the semaphore can be acquired

Returns:

  • the result of the block if invoked



68
69
70
71
72
73
74
75
76
77
78
79
80
# File 'lib/async/semaphore.rb', line 68

def acquire
  wait
  
  @count += 1
  
  return unless block_given?
  
  begin
    return yield
  ensure
    self.release
  end
end

#async(*args, parent: Task.current, **options) ⇒ Object

Run an async task. Will wait until the semaphore is ready until spawning and running the task.



50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/async/semaphore.rb', line 50

def async(*args, parent: Task.current, **options)
  wait
  
  parent.async(**options) do |task|
    @count += 1
    
    begin
      yield task, *args
    ensure
      self.release
    end
  end
end

#blocking?Boolean

Whether trying to acquire this semaphore would block.

Returns:

  • (Boolean)


45
46
47
# File 'lib/async/semaphore.rb', line 45

def blocking?
  @count >= @limit
end

#empty?Boolean

Is the semaphore currently acquired?

Returns:

  • (Boolean)


40
41
42
# File 'lib/async/semaphore.rb', line 40

def empty?
  @count.zero?
end

#releaseObject

Release the semaphore. Must match up with a corresponding call to acquire.



83
84
85
86
87
88
89
90
91
# File 'lib/async/semaphore.rb', line 83

def release
  @count -= 1
  
  available = @waiting.pop(@limit - @count)
  
  available.each do |fiber|
    fiber.resume if fiber.alive?
  end
end