Class: Aggkit::Watcher::ProcessHandler
- Inherits:
-
Object
- Object
- Aggkit::Watcher::ProcessHandler
- Defined in:
- lib/aggkit/watcher/process_handler.rb
Instance Attribute Summary collapse
-
#command ⇒ Object
Returns the value of attribute command.
-
#options ⇒ Object
Returns the value of attribute options.
-
#process ⇒ Object
Returns the value of attribute process.
-
#stderr ⇒ Object
Returns the value of attribute stderr.
-
#stdin ⇒ Object
Returns the value of attribute stdin.
-
#stdout ⇒ Object
Returns the value of attribute stdout.
-
#watcher ⇒ Object
Returns the value of attribute watcher.
Class Method Summary collapse
Instance Method Summary collapse
- #build_process(*cmd) ⇒ Object
- #handled? ⇒ Boolean
-
#initialize(watcher, *cmd) ⇒ ProcessHandler
constructor
A new instance of ProcessHandler.
- #initialize_streams ⇒ Object
- #method_missing(m, *args, &block) ⇒ Object
- #stop(timeout = (@options[:timeout] || 5)) ⇒ Object
- #stop!(status) ⇒ Object
- #synchro_readline(io, out) ⇒ Object
- #terminate ⇒ Object
Constructor Details
#initialize(watcher, *cmd) ⇒ ProcessHandler
Returns a new instance of ProcessHandler.
18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
# File 'lib/aggkit/watcher/process_handler.rb', line 18 def initialize(watcher, *cmd) @watcher = watcher @options = if cmd.last.is_a? Hash cmd.pop else {} end @command = cmd.flatten.map{|c| c.to_s.strip }.reject(&:empty?) @process = build_process(*command) initialize_streams @process.start end |
Dynamic Method Handling
This class handles dynamic methods through the method_missing method
#method_missing(m, *args, &block) ⇒ Object
126 127 128 129 130 131 132 |
# File 'lib/aggkit/watcher/process_handler.rb', line 126 def method_missing(m, *args, &block) if @process.respond_to? m @process.send(m, *args, &block) else super end end |
Instance Attribute Details
#command ⇒ Object
Returns the value of attribute command.
5 6 7 |
# File 'lib/aggkit/watcher/process_handler.rb', line 5 def command @command end |
#options ⇒ Object
Returns the value of attribute options.
5 6 7 |
# File 'lib/aggkit/watcher/process_handler.rb', line 5 def @options end |
#process ⇒ Object
Returns the value of attribute process.
5 6 7 |
# File 'lib/aggkit/watcher/process_handler.rb', line 5 def process @process end |
#stderr ⇒ Object
Returns the value of attribute stderr.
5 6 7 |
# File 'lib/aggkit/watcher/process_handler.rb', line 5 def stderr @stderr end |
#stdin ⇒ Object
Returns the value of attribute stdin.
5 6 7 |
# File 'lib/aggkit/watcher/process_handler.rb', line 5 def stdin @stdin end |
#stdout ⇒ Object
Returns the value of attribute stdout.
5 6 7 |
# File 'lib/aggkit/watcher/process_handler.rb', line 5 def stdout @stdout end |
#watcher ⇒ Object
Returns the value of attribute watcher.
5 6 7 |
# File 'lib/aggkit/watcher/process_handler.rb', line 5 def watcher @watcher end |
Class Method Details
.capture(cmd) ⇒ Object
7 8 9 10 11 12 13 14 15 16 |
# File 'lib/aggkit/watcher/process_handler.rb', line 7 def self.capture(cmd) io = IO.popen(cmd) output = io.read begin io.close rescue StandardError nil end [output, $?] end |
Instance Method Details
#build_process(*cmd) ⇒ Object
59 60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/aggkit/watcher/process_handler.rb', line 59 def build_process(*cmd) pr = ::Aggkit::ChildProcess.build(*cmd) @stdout, wout = IO.pipe @stderr, werr = IO.pipe @stdin = pr.io.stdin pr.io.stdout = wout pr.io.stderr = werr pr.duplex = true pr end |
#handled? ⇒ Boolean
72 73 74 |
# File 'lib/aggkit/watcher/process_handler.rb', line 72 def handled? !!@process.exit_code end |
#initialize_streams ⇒ Object
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/aggkit/watcher/process_handler.rb', line 35 def initialize_streams @threads = [] @threads << Thread.new do loop do break unless synchro_readline(stdout, STDOUT) end end @threads << Thread.new do loop do break unless synchro_readline(stderr, STDERR) end end end |
#stop(timeout = (@options[:timeout] || 5)) ⇒ Object
103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 |
# File 'lib/aggkit/watcher/process_handler.rb', line 103 def stop(timeout = (@options[:timeout] || 5)) return if exited? terminate begin return poll_for_exit(timeout) rescue TimeoutError # try next end begin @process.send(:send_kill) rescue Errno::ECHILD, Errno::ESRCH # handle race condition where process dies between timeout # and send_kill end wait ensure clean_all end |
#stop!(status) ⇒ Object
76 77 78 79 |
# File 'lib/aggkit/watcher/process_handler.rb', line 76 def stop!(status) @process.send(:set_exit_code, status) stop end |
#synchro_readline(io, out) ⇒ Object
51 52 53 54 55 56 57 |
# File 'lib/aggkit/watcher/process_handler.rb', line 51 def synchro_readline(io, out) str = io.gets @watcher.iolock.synchronize{ out.puts(str) } true rescue StandardError false end |
#terminate ⇒ Object
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 |
# File 'lib/aggkit/watcher/process_handler.rb', line 81 def terminate return if exited? return if @terminating @terminating = true if @options[:termcmd] termcmd = @options[:termcmd].gsub('%PID%', pid.to_s) @watcher.log "Terminating by #{termcmd}..." output, status = ::Aggkit::Watcher::ProcessHandler.capture(termcmd) if status.success? @watcher.log "Success: #{output}" else @watcher.error "Failed: #{output}" @process.send(:send_term) end else @process.send(:send_term) end end |