Class: Async::Barrier
- Inherits:
-
Object
- Object
- Async::Barrier
- Defined in:
- lib/async/barrier.rb
Overview
A general purpose synchronisation primitive, which allows one task to wait for a number of other tasks to complete. It can be used in conjunction with Semaphore.
Instance Attribute Summary collapse
-
#tasks ⇒ Object
readonly
All tasks which have been invoked into the barrier.
Instance Method Summary collapse
-
#async(*arguments, parent: (@parent or Task.current), **options, &block) ⇒ Object
Execute a child task and add it to the barrier.
-
#empty? ⇒ Boolean
Whether there are any tasks being held by the barrier.
-
#initialize(parent: nil) ⇒ Barrier
constructor
Initialize the barrier.
-
#size ⇒ Object
Number of tasks being held by the barrier.
-
#stop ⇒ Object
Stop all tasks held by the barrier.
-
#wait ⇒ Object
Wait for all tasks to complete by invoking Task#wait on each waiting task, which may raise an error.
Constructor Details
Instance Attribute Details
#tasks ⇒ Object (readonly)
All tasks which have been invoked into the barrier.
41 42 43 |
# File 'lib/async/barrier.rb', line 41 def tasks @tasks end |
Instance Method Details
#async(*arguments, parent: (@parent or Task.current), **options, &block) ⇒ Object
Execute a child task and add it to the barrier.
45 46 47 48 49 50 51 52 53 54 55 56 57 |
# File 'lib/async/barrier.rb', line 45 def async(*arguments, parent: (@parent or Task.current), **, &block) raise "Barrier is stopped!" if @finished.closed? waiting = nil parent.async(*arguments, **) do |task, *arguments| waiting = TaskNode.new(task) @tasks.append(waiting) block.call(task, *arguments) ensure @finished.signal(waiting) unless @finished.closed? end end |
#empty? ⇒ Boolean
Whether there are any tasks being held by the barrier.
61 62 63 |
# File 'lib/async/barrier.rb', line 61 def empty? @tasks.empty? end |
#size ⇒ Object
Number of tasks being held by the barrier.
36 37 38 |
# File 'lib/async/barrier.rb', line 36 def size @tasks.size end |
#stop ⇒ Object
Stop all tasks held by the barrier.
93 94 95 96 97 98 99 |
# File 'lib/async/barrier.rb', line 93 def stop @tasks.each do |waiting| waiting.task.stop end @finished.close end |
#wait ⇒ Object
Wait for all tasks to complete by invoking Task#wait on each waiting task, which may raise an error. As long as the task has completed, it will be removed from the barrier.
70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 |
# File 'lib/async/barrier.rb', line 70 def wait while !@tasks.empty? # Wait for a task to finish (we get the task node): return unless waiting = @finished.wait # Remove the task as it is now finishing: @tasks.remove?(waiting) # Get the task: task = waiting.task # If a block is given, the user can implement their own behaviour: if block_given? yield task else # Wait for it to either complete or raise an error: task.wait end end end |