Class: Async::Barrier

Inherits:
Object
  • Object
show all
Defined in:
lib/async/barrier.rb

Overview

A synchronization primitive, which allows one task to wait for a number of other tasks to complete. It can be used in conjunction with Semaphore.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(parent: nil) ⇒ Barrier

Initialize the barrier.



32
33
34
35
36
# File 'lib/async/barrier.rb', line 32

def initialize(parent: nil)
  @tasks = []
  
  @parent = parent
end

Instance Attribute Details

#tasksObject (readonly)

All tasks which have been invoked into the barrier.



39
40
41
# File 'lib/async/barrier.rb', line 39

def tasks
  @tasks
end

Instance Method Details

#async(*arguments, parent: (@parent or Task.current), **options, &block) ⇒ Object

Execute a child task and add it to the barrier.



48
49
50
51
52
53
54
# File 'lib/async/barrier.rb', line 48

def async(*arguments, parent: (@parent or Task.current), **options, &block)
  task = parent.async(*arguments, **options, &block)
  
  @tasks << task
  
  return task
end

#empty?Boolean

Whether there are any tasks being held by the barrier.

Returns:

  • (Boolean)


58
59
60
# File 'lib/async/barrier.rb', line 58

def empty?
  @tasks.empty?
end

#sizeObject

The number of tasks currently held by the barrier.



42
43
44
# File 'lib/async/barrier.rb', line 42

def size
  @tasks.size
end

#stopObject

Stop all tasks held by the barrier.



83
84
85
86
87
# File 'lib/async/barrier.rb', line 83

def stop
  # We have to be careful to avoid enumerating tasks while adding/removing to it:
  tasks = @tasks.dup
  tasks.each(&:stop)
end

#waitObject

Wait for all tasks.



64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/async/barrier.rb', line 64

def wait
  # TODO: This would be better with linked list.
  while @tasks.any?
    task = @tasks.first
    
    begin
      task.wait
    ensure
      # We don't know for sure that the exception was due to the task completion.
      unless task.running?
        # Remove the task from the waiting list if it's finished:
        @tasks.shift if @tasks.first == task
      end
    end
  end
end