12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
|
# File 'lib/hakuban/engine.rb', line 12
def run(exchange)
contract = contract(exchange)
while next_sink_or_stream = contract.next
if respond_to?(:handle_with_thread)
if contract.class < ObserveContract
Thread.new(next_sink_or_stream) do |sink_or_stream|
handle_with_thread(sink_or_stream)
rescue Object => error
$stderr.puts "#{self.class.name} exception: #{error.message}\n#{error.backtrace.join("\n")}\n"
contract.drop
ensure
sink_or_stream.drop
end
else
Thread.new(next_sink_or_stream) do |sink_or_stream|
thread = Thread.handle_interrupt(Object => :never) {
Thread.new do
Thread.handle_interrupt(Object => :immediate) {
handle_with_thread(sink_or_stream)
}
rescue Stop
ensure
sink_or_stream.drop
end
}
while sink_or_stream.next; end
thread.raise Hakuban::Engine::Stop
thread.join
rescue Object => error
$stderr.puts "#{self.class.name} exception: #{error.message}\n#{error.backtrace.join("\n")}\n"
contract.drop
end
end
elsif respond_to?(:handle_with_ractor)
raise "Ractors are not supported yet"
if contract.class < ObserveContract
Ractor.new(self, next_sink_or_stream) do |engine, sink_or_stream|
engine.handle_with_ractor(sink_or_stream)
ensure
sink_or_stream.drop
end
else
Ractor.new(self, next_sink_or_stream) do |engine, sink_or_stream|
thread = Thread.handle_interrupt(Object => :never) {
Thread.new do
Thread.handle_interrupt(Object => :immediate) {
handle_with_ractor(sink_or_stream)
}
rescue Stop
ensure
sink_or_stream.drop
end
}
while sink_or_stream.next; end
thread.raise Hakuban::Engine::Stop
thread.join
end
end
else
raise "No handler defined. Engine class should define one of the following methods: handle_with_thread, handle_with_ractor"
end
end
rescue Stop
ensure
contract.drop if contract
end
|