Class: Aggkit::Watcher

Inherits:
Object
  • Object
show all
Defined in:
lib/aggkit/watcher.rb

Defined Under Namespace

Classes: Pipe, ProcessHandler

Constant Summary collapse

EXIT_SIGNALS =
%w[EXIT QUIT].freeze
TERM_SIGNALS =
%w[INT TERM].freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeWatcher

Returns a new instance of Watcher.



31
32
33
34
35
36
37
38
39
# File 'lib/aggkit/watcher.rb', line 31

def initialize
  @iolock = Mutex.new
  @pipe = Pipe.new

  @procs = []
  @code = 0
  @crashed = false
  @who = nil
end

Instance Attribute Details

#iolockObject

Returns the value of attribute iolock.



10
11
12
# File 'lib/aggkit/watcher.rb', line 10

def iolock
  @iolock
end

Instance Method Details

#add(*cmd) ⇒ Object



41
42
43
44
45
46
# File 'lib/aggkit/watcher.rb', line 41

def add(*cmd)
  log "Starting #{cmd}..."
  @procs.push ProcessHandler.new(self, *cmd)
  log "  * PID: #{@procs.last.pid}"
  @procs.last
end

#error(msg) ⇒ Object



52
53
54
# File 'lib/aggkit/watcher.rb', line 52

def error(msg)
  @iolock.synchronize{ STDERR.puts("[watcher][#{Time.now.strftime('%H:%M:%S.%L')}]: Error: #{msg}") }
end

#execObject



86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
# File 'lib/aggkit/watcher.rb', line 86

def exec
  install_signal_handlers(@pipe)

  begin
    yield(self)
  rescue StandardError => e
    @crashed = true
    @code = 1
    error e.inspect
    error e.backtrace.last(20).join("\n")
    log 'Try exits gracefully..'
    terminate_all
    exit!(@code)
  end

  loop_childs
  quit! if @procs.all?(&:handled?)

  Thread.new(@pipe) do |pipe|
    loop do
      begin
        sleep 10
        pipe.puts :gc
      rescue StandardError => e
        log "GC thread exception: #{e.inspect}"
      end
    end
  end

  loop do
    log 'Main loop...'
    case event = @pipe.gets
    when :term
      log '  * Child terminated: try exits gracefully...'
      terminate_all
      loop_childs
      quit!
    when *(EXIT_SIGNALS + TERM_SIGNALS).map(&:to_sym)
      log "  * Catch #{event}: try exits gracefully..."
      terminate_all
      loop_childs
      quit!
    when :child, :gc
      log "  * Main loop event: #{event}"
      loop_childs
      quit! if @procs.all?(&:handled?)
    else
      log "  * Unknown event: #{event.inspect}"
      loop_childs
      quit! if @procs.all?(&:handled?)
    end
  end
end

#log(msg) ⇒ Object



48
49
50
# File 'lib/aggkit/watcher.rb', line 48

def log(msg)
  @iolock.synchronize{ STDOUT.puts("[watcher][#{Time.now.strftime('%H:%M:%S.%L')}]: #{msg}") }
end

#quit!Object



64
65
66
67
# File 'lib/aggkit/watcher.rb', line 64

def quit!
  @code = [@code || 0, 1].max if @crashed
  exit!(@code || 0)
end

#set_crash_report(pr) ⇒ Object



56
57
58
59
60
61
62
# File 'lib/aggkit/watcher.rb', line 56

def set_crash_report(pr)
  return if @crashed

  @crashed  = true
  @code     = pr.exit_code
  @who      = pr.command
end

#terminate_allObject



69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/aggkit/watcher.rb', line 69

def terminate_all
  raise 'Double termination occured!' if @terminating

  @terminating = true
  running = @procs.reject(&:handled?)

  running.each do |pr|
    pr.stdin.close rescue nil
    pr.terminate
  end

  running.each do |pr|
    pr.stop
    collect_managed(pr)
  end
end