21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
|
# File 'lib/foxbat/barrier.rb', line 21
def initialize(tasks, repeat=1, callback=nil, timeout=Long::MAX_VALUE, err=nil)
@phaser = FBPhaser.new(1)
if repeat == 0
repeat = java.lang.Integer::MAX_VALUE
end
@phaser.limit = repeat
@phaser.callback = callback
phased_tasks = tasks.map do |t|
Proc.new do
@phaser.register
while !@phaser.isTerminated
begin
Timeout::timeout(timeout) { t.call }
rescue Exception => e
err.call(e) if err
break
end
@phaser.arriveAndAwaitAdvance
end
end
end
phased_tasks.each { |t| EM.executor.execute(t) }
start = Proc.new do
parties = @phaser.getRegisteredParties
if parties > 1
@phaser.arriveAndDeregister
else
java.lang.Thread.sleep(10)
start.call
end
end
start.call
end
|