Class: Async::Barrier
- Inherits:
-
Object
- Object
- Async::Barrier
- Defined in:
- lib/async/barrier.rb
Overview
A synchronization primitive, which allows one task to wait for a number of other tasks to complete. It can be used in conjunction with Semaphore.
Instance Attribute Summary collapse
-
#tasks ⇒ Object
readonly
All tasks which have been invoked into the barrier.
Instance Method Summary collapse
-
#async(*arguments, parent: (@parent or Task.current), **options, &block) ⇒ Object
Execute a child task and add it to the barrier.
-
#empty? ⇒ Boolean
Whether there are any tasks being held by the barrier.
-
#initialize(parent: nil) ⇒ Barrier
constructor
Initialize the barrier.
-
#size ⇒ Object
The number of tasks currently held by the barrier.
-
#stop ⇒ Object
Stop all tasks held by the barrier.
-
#wait ⇒ Object
Wait for all tasks.
Constructor Details
#initialize(parent: nil) ⇒ Barrier
Initialize the barrier.
15 16 17 18 19 |
# File 'lib/async/barrier.rb', line 15 def initialize(parent: nil) @tasks = [] @parent = parent end |
Instance Attribute Details
#tasks ⇒ Object (readonly)
All tasks which have been invoked into the barrier.
22 23 24 |
# File 'lib/async/barrier.rb', line 22 def tasks @tasks end |
Instance Method Details
#async(*arguments, parent: (@parent or Task.current), **options, &block) ⇒ Object
Execute a child task and add it to the barrier.
31 32 33 34 35 36 37 |
# File 'lib/async/barrier.rb', line 31 def async(*arguments, parent: (@parent or Task.current), **, &block) task = parent.async(*arguments, **, &block) @tasks << task return task end |
#empty? ⇒ Boolean
Whether there are any tasks being held by the barrier.
41 42 43 |
# File 'lib/async/barrier.rb', line 41 def empty? @tasks.empty? end |
#size ⇒ Object
The number of tasks currently held by the barrier.
25 26 27 |
# File 'lib/async/barrier.rb', line 25 def size @tasks.size end |
#stop ⇒ Object
Stop all tasks held by the barrier.
66 67 68 69 70 |
# File 'lib/async/barrier.rb', line 66 def stop # We have to be careful to avoid enumerating tasks while adding/removing to it: tasks = @tasks.dup tasks.each(&:stop) end |
#wait ⇒ Object
Wait for all tasks.
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 |
# File 'lib/async/barrier.rb', line 47 def wait # TODO: This would be better with linked list. while @tasks.any? task = @tasks.first begin task.wait ensure # We don't know for sure that the exception was due to the task completion. unless task.running? # Remove the task from the waiting list if it's finished: @tasks.shift if @tasks.first == task end end end end |