Class: Async::Barrier

Inherits:
Object
  • Object
show all
Defined in:
lib/async/barrier.rb

Overview

A general purpose synchronisation primitive, which allows one task to wait for a number of other tasks to complete. It can be used in conjunction with Semaphore.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(parent: nil) ⇒ Barrier

Initialize the barrier.



18
19
20
21
22
23
# File 'lib/async/barrier.rb', line 18

def initialize(parent: nil)
	@tasks = List.new
	@finished = Queue.new
	
	@parent = parent
end

Instance Attribute Details

#tasksObject (readonly)

All tasks which have been invoked into the barrier.



41
42
43
# File 'lib/async/barrier.rb', line 41

def tasks
  @tasks
end

Instance Method Details

#async(*arguments, parent: (@parent or Task.current), **options, &block) ⇒ Object

Execute a child task and add it to the barrier.



45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/async/barrier.rb', line 45

def async(*arguments, parent: (@parent or Task.current), **options, &block)
	raise "Barrier is stopped!" if @finished.closed?
	
	waiting = nil
	
	parent.async(*arguments, **options) do |task, *arguments|
		waiting = TaskNode.new(task)
		@tasks.append(waiting)
		block.call(task, *arguments)
	ensure
		@finished.signal(waiting) unless @finished.closed?
	end
end

#empty?Boolean

Whether there are any tasks being held by the barrier.

Returns:

  • (Boolean)


61
62
63
# File 'lib/async/barrier.rb', line 61

def empty?
	@tasks.empty?
end

#sizeObject

Number of tasks being held by the barrier.



36
37
38
# File 'lib/async/barrier.rb', line 36

def size
	@tasks.size
end

#stopObject

Stop all tasks held by the barrier.



93
94
95
96
97
98
99
# File 'lib/async/barrier.rb', line 93

def stop
	@tasks.each do |waiting|
		waiting.task.stop
	end
	
	@finished.close
end

#waitObject

Wait for all tasks to complete by invoking Task#wait on each waiting task, which may raise an error. As long as the task has completed, it will be removed from the barrier.



70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
# File 'lib/async/barrier.rb', line 70

def wait
	while !@tasks.empty?
		# Wait for a task to finish (we get the task node):
		return unless waiting = @finished.wait
		
		# Remove the task as it is now finishing:
		@tasks.remove?(waiting)
		
		# Get the task:
		task = waiting.task
		
		# If a block is given, the user can implement their own behaviour:
		if block_given?
			yield task
		else
			# Wait for it to either complete or raise an error:
			task.wait
		end
	end
end