Class: Aggkit::Watcher
- Inherits:
-
Object
show all
- Defined in:
- lib/aggkit/watcher.rb
Defined Under Namespace
Classes: Pipe, ProcessHandler
Constant Summary
collapse
- EXIT_SIGNALS =
%w[EXIT QUIT].freeze
- TERM_SIGNALS =
%w[INT TERM].freeze
Instance Attribute Summary collapse
Instance Method Summary
collapse
Constructor Details
Returns a new instance of Watcher.
31
32
33
34
35
36
37
38
39
|
# File 'lib/aggkit/watcher.rb', line 31
def initialize
@iolock = Mutex.new
@pipe = Pipe.new
@procs = []
@code = 0
@crashed = false
@who = nil
end
|
Instance Attribute Details
#iolock ⇒ Object
Returns the value of attribute iolock.
10
11
12
|
# File 'lib/aggkit/watcher.rb', line 10
def iolock
@iolock
end
|
Instance Method Details
#add(*cmd) ⇒ Object
41
42
43
44
45
46
|
# File 'lib/aggkit/watcher.rb', line 41
def add(*cmd)
log "Starting #{cmd}..."
@procs.push ProcessHandler.new(self, *cmd)
log " * PID: #{@procs.last.pid}"
@procs.last
end
|
#error(msg) ⇒ Object
52
53
54
|
# File 'lib/aggkit/watcher.rb', line 52
def error(msg)
@iolock.synchronize{ STDERR.puts("[watcher][#{Time.now.strftime('%H:%M:%S.%L')}]: Error: #{msg}") }
end
|
#exec ⇒ Object
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
|
# File 'lib/aggkit/watcher.rb', line 86
def exec
install_signal_handlers(@pipe)
begin
yield(self)
rescue StandardError => e
@crashed = true
@code = 1
error e.inspect
error e.backtrace.last(20).join("\n")
log 'Try exits gracefully..'
terminate_all
exit!(@code)
end
loop_childs
quit! if @procs.all?(&:handled?)
Thread.new(@pipe) do |pipe|
loop do
begin
sleep 10
pipe.puts :gc
rescue StandardError => e
log "GC thread exception: #{e.inspect}"
end
end
end
loop do
log 'Main loop...'
case event = @pipe.gets
when :term
log ' * Child terminated: try exits gracefully...'
terminate_all
loop_childs
quit!
when *(EXIT_SIGNALS + TERM_SIGNALS).map(&:to_sym)
log " * Catch #{event}: try exits gracefully..."
terminate_all
loop_childs
quit!
when :child, :gc
log " * Main loop event: #{event}"
loop_childs
quit! if @procs.all?(&:handled?)
else
log " * Unknown event: #{event.inspect}"
loop_childs
quit! if @procs.all?(&:handled?)
end
end
end
|
#log(msg) ⇒ Object
48
49
50
|
# File 'lib/aggkit/watcher.rb', line 48
def log(msg)
@iolock.synchronize{ STDOUT.puts("[watcher][#{Time.now.strftime('%H:%M:%S.%L')}]: #{msg}") }
end
|
#quit! ⇒ Object
64
65
66
67
|
# File 'lib/aggkit/watcher.rb', line 64
def quit!
@code = [@code || 0, 1].max if @crashed
exit!(@code || 0)
end
|
#set_crash_report(pr) ⇒ Object
56
57
58
59
60
61
62
|
# File 'lib/aggkit/watcher.rb', line 56
def set_crash_report(pr)
return if @crashed
@crashed = true
@code = pr.exit_code
@who = pr.command
end
|
#terminate_all ⇒ Object
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
|
# File 'lib/aggkit/watcher.rb', line 69
def terminate_all
raise 'Double termination occured!' if @terminating
@terminating = true
running = @procs.reject(&:handled?)
running.each do |pr|
pr.stdin.close rescue nil
pr.terminate
end
running.each do |pr|
pr.stop
collect_managed(pr)
end
end
|