Class: Aggkit::Watcher::ProcessHandler

Inherits:
Object
  • Object
show all
Defined in:
lib/aggkit/watcher/process_handler.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(watcher, *cmd) ⇒ ProcessHandler

Returns a new instance of ProcessHandler.



18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# File 'lib/aggkit/watcher/process_handler.rb', line 18

def initialize(watcher, *cmd)
  @watcher = watcher

  @options = if cmd.last.is_a? Hash
    cmd.pop
  else
    {}
  end

  @command = cmd.flatten.map{|c| c.to_s.strip }.reject(&:empty?)
  @process = build_process(*command)

  initialize_streams

  @process.start
end

Dynamic Method Handling

This class handles dynamic methods through the method_missing method

#method_missing(m, *args, &block) ⇒ Object



126
127
128
129
130
131
132
# File 'lib/aggkit/watcher/process_handler.rb', line 126

def method_missing(m, *args, &block)
  if @process.respond_to? m
    @process.send(m, *args, &block)
  else
    super
  end
end

Instance Attribute Details

#commandObject

Returns the value of attribute command.



5
6
7
# File 'lib/aggkit/watcher/process_handler.rb', line 5

def command
  @command
end

#optionsObject

Returns the value of attribute options.



5
6
7
# File 'lib/aggkit/watcher/process_handler.rb', line 5

def options
  @options
end

#processObject

Returns the value of attribute process.



5
6
7
# File 'lib/aggkit/watcher/process_handler.rb', line 5

def process
  @process
end

#stderrObject

Returns the value of attribute stderr.



5
6
7
# File 'lib/aggkit/watcher/process_handler.rb', line 5

def stderr
  @stderr
end

#stdinObject

Returns the value of attribute stdin.



5
6
7
# File 'lib/aggkit/watcher/process_handler.rb', line 5

def stdin
  @stdin
end

#stdoutObject

Returns the value of attribute stdout.



5
6
7
# File 'lib/aggkit/watcher/process_handler.rb', line 5

def stdout
  @stdout
end

#watcherObject

Returns the value of attribute watcher.



5
6
7
# File 'lib/aggkit/watcher/process_handler.rb', line 5

def watcher
  @watcher
end

Class Method Details

.capture(cmd) ⇒ Object



7
8
9
10
11
12
13
14
15
16
# File 'lib/aggkit/watcher/process_handler.rb', line 7

def self.capture(cmd)
  io = IO.popen(cmd)
  output = io.read
  begin
    io.close
  rescue StandardError
    nil
  end
  [output, $?]
end

Instance Method Details

#build_process(*cmd) ⇒ Object



59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/aggkit/watcher/process_handler.rb', line 59

def build_process(*cmd)
  pr = ::Aggkit::ChildProcess.build(*cmd)
  @stdout, wout = IO.pipe
  @stderr, werr = IO.pipe

  @stdin = pr.io.stdin

  pr.io.stdout = wout
  pr.io.stderr = werr
  pr.duplex = true
  pr
end

#handled?Boolean

Returns:

  • (Boolean)


72
73
74
# File 'lib/aggkit/watcher/process_handler.rb', line 72

def handled?
  !!@process.exit_code
end

#initialize_streamsObject



35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/aggkit/watcher/process_handler.rb', line 35

def initialize_streams
  @threads = []

  @threads << Thread.new do
    loop do
      break unless synchro_readline(stdout, STDOUT)
    end
  end

  @threads << Thread.new do
    loop do
      break unless synchro_readline(stderr, STDERR)
    end
  end
end

#stop(timeout = (@options[:timeout] || 5)) ⇒ Object



103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
# File 'lib/aggkit/watcher/process_handler.rb', line 103

def stop(timeout = (@options[:timeout] || 5))
  return if exited?

  terminate

  begin
    return poll_for_exit(timeout)
  rescue TimeoutError
    # try next
  end

  begin
    @process.send(:send_kill)
  rescue Errno::ECHILD, Errno::ESRCH
    # handle race condition where process dies between timeout
    # and send_kill
  end

  wait
ensure
  clean_all
end

#stop!(status) ⇒ Object



76
77
78
79
# File 'lib/aggkit/watcher/process_handler.rb', line 76

def stop!(status)
  @process.send(:set_exit_code, status)
  stop
end

#synchro_readline(io, out) ⇒ Object



51
52
53
54
55
56
57
# File 'lib/aggkit/watcher/process_handler.rb', line 51

def synchro_readline(io, out)
  str = io.gets
  @watcher.iolock.synchronize{ out.puts(str) }
  true
rescue StandardError
  false
end

#terminateObject



81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/aggkit/watcher/process_handler.rb', line 81

def terminate
  return if exited?
  return if @terminating

  @terminating = true

  if @options[:termcmd]
    termcmd = @options[:termcmd].gsub('%PID%', pid.to_s)
    @watcher.log "Terminating by #{termcmd}..."
    output, status = ::Aggkit::Watcher::ProcessHandler.capture(termcmd)

    if status.success?
      @watcher.log "Success: #{output}"
    else
      @watcher.error "Failed: #{output}"
      @process.send(:send_term)
    end
  else
    @process.send(:send_term)
  end
end