Module: Fluent::PluginHelper::ChildProcess
Defined Under Namespace
Classes: ProcessInfo
Constant Summary collapse
- CHILD_PROCESS_LOOP_CHECK_INTERVAL =
sec
0.2
- CHILD_PROCESS_DEFAULT_EXIT_TIMEOUT =
sec
10- CHILD_PROCESS_DEFAULT_KILL_TIMEOUT =
sec
60- MODE_PARAMS =
[:read, :write, :stderr, :read_with_stderr]
- STDERR_OPTIONS =
[:discard, :connect]
Constants included from EventLoop
EventLoop::EVENT_LOOP_RUN_DEFAULT_TIMEOUT
Constants included from Thread
Thread::THREAD_DEFAULT_WAIT_SECONDS
Instance Attribute Summary collapse
-
#_child_process_processes ⇒ Object
readonly
stop : mark callback thread as stopped shutdown : close write IO to child processes (STDIN of child processes), send TERM (KILL for Windows) to all child processes close : send KILL to all child processes terminate: [-].
Attributes included from Timer
Attributes included from EventLoop
Attributes included from Thread
Instance Method Summary collapse
- #child_process_execute(title, command, arguments: nil, subprocess_name: nil, interval: nil, immediate: false, parallel: false, mode: [:read, :write], stderr: :discard, env: {}, unsetenv: false, chdir: nil, internal_encoding: 'utf-8', external_encoding: 'ascii-8bit', scrub: true, replace_string: nil, &block) ⇒ Object
- #child_process_execute_once(title, command, arguments, subprocess_name, mode, stderr, env, unsetenv, chdir, internal_encoding, external_encoding, scrub, replace_string, &block) ⇒ Object
- #child_process_exit_status ⇒ Object
- #child_process_id ⇒ Object
- #child_process_kill(process_info, force: false) ⇒ Object
- #child_process_running? ⇒ Boolean
- #close ⇒ Object
- #initialize ⇒ Object
- #shutdown ⇒ Object
- #start ⇒ Object
- #stop ⇒ Object
Methods included from Timer
#terminate, #timer_execute, #timer_running?
Methods included from EventLoop
#event_loop_attach, #event_loop_running?, #event_loop_wait_until_start, #event_loop_wait_until_stop, #terminate
Methods included from Thread
#terminate, #thread_create, #thread_current_running?, #thread_exist?, #thread_running?, #thread_started?, #thread_wait_until_start, #thread_wait_until_stop
Instance Attribute Details
#_child_process_processes ⇒ Object (readonly)
stop : mark callback thread as stopped shutdown : close write IO to child processes (STDIN of child processes), send TERM (KILL for Windows) to all child processes close : send KILL to all child processes terminate: [-]
41 42 43 |
# File 'lib/fluent/plugin_helper/child_process.rb', line 41 def _child_process_processes @_child_process_processes end |
Instance Method Details
#child_process_execute(title, command, arguments: nil, subprocess_name: nil, interval: nil, immediate: false, parallel: false, mode: [:read, :write], stderr: :discard, env: {}, unsetenv: false, chdir: nil, internal_encoding: 'utf-8', external_encoding: 'ascii-8bit', scrub: true, replace_string: nil, &block) ⇒ Object
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 |
# File 'lib/fluent/plugin_helper/child_process.rb', line 56 def child_process_execute( title, command, arguments: nil, subprocess_name: nil, interval: nil, immediate: false, parallel: false, mode: [:read, :write], stderr: :discard, env: {}, unsetenv: false, chdir: nil, internal_encoding: 'utf-8', external_encoding: 'ascii-8bit', scrub: true, replace_string: nil, &block ) raise ArgumentError, "BUG: title must be a symbol" unless title.is_a? Symbol raise ArgumentError, "BUG: arguments required if subprocess name is replaced" if subprocess_name && !arguments raise ArgumentError, "BUG: invalid mode specification" unless mode.all?{|m| MODE_PARAMS.include?(m) } raise ArgumentError, "BUG: read_with_stderr is exclusive with :read and :stderr" if mode.include?(:read_with_stderr) && (mode.include?(:read) || mode.include?(:stderr)) raise ArgumentError, "BUG: invalid stderr handling specification" unless STDERR_OPTIONS.include?(stderr) raise ArgumentError, "BUG: block not specified which receive i/o object" unless block_given? raise ArgumentError, "BUG: number of block arguments are different from size of mode" unless block.arity == mode.size running = false callback = ->(*args) { running = true begin block.call(*args) ensure running = false end } if immediate || !interval child_process_execute_once(title, command, arguments, subprocess_name, mode, stderr, env, unsetenv, chdir, internal_encoding, external_encoding, scrub, replace_string, &callback) end if interval timer_execute(:child_process_execute, interval, repeat: true) do if !parallel && running log.warn "previous child process is still running. skipped.", title: title, command: command, arguments: arguments, interval: interval, parallel: parallel else child_process_execute_once(title, command, arguments, subprocess_name, mode, stderr, env, unsetenv, chdir, internal_encoding, external_encoding, scrub, replace_string, &callback) end end end end |
#child_process_execute_once(title, command, arguments, subprocess_name, mode, stderr, env, unsetenv, chdir, internal_encoding, external_encoding, scrub, replace_string, &block) ⇒ Object
195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 |
# File 'lib/fluent/plugin_helper/child_process.rb', line 195 def child_process_execute_once( title, command, arguments, subprocess_name, mode, stderr, env, unsetenv, chdir, internal_encoding, external_encoding, scrub, replace_string, &block ) spawn_args = if arguments || subprocess_name [ env, (subprocess_name ? [command, subprocess_name] : command), *(arguments || []) ] else [ env, command ] end spawn_opts = { unsetenv_others: unsetenv, } if chdir spawn_opts[:chdir] = chdir end = {} if scrub [:invalid] = [:undef] = :replace if replace_string [:replace] = replace_string end end log.debug "Executing command", title: title, spawn: spawn_args, mode: mode, stderr: stderr readio = writeio = stderrio = wait_thread = nil readio_in_use = writeio_in_use = stderrio_in_use = false if !mode.include?(:stderr) && !mode.include?(:read_with_stderr) && stderr != :discard # connect writeio, readio, wait_thread = *Open3.popen2(*spawn_args, spawn_opts) elsif mode.include?(:read_with_stderr) writeio, readio, wait_thread = *Open3.popen2e(*spawn_args, spawn_opts) else writeio, readio, stderrio, wait_thread = *Open3.popen3(*spawn_args, spawn_opts) if !mode.include?(:stderr) # stderr == :discard stderrio.reopen(IO::NULL) end end if mode.include?(:write) writeio.set_encoding(external_encoding, internal_encoding, ) writeio_in_use = true end if mode.include?(:read) || mode.include?(:read_with_stderr) readio.set_encoding(external_encoding, internal_encoding, ) readio_in_use = true end if mode.include?(:stderr) stderrio.set_encoding(external_encoding, internal_encoding, ) stderrio_in_use = true end pid = wait_thread.pid # wait_thread => Process::Waiter io_objects = [] mode.each do |m| io_objects << case m when :read then readio when :write then writeio when :read_with_stderr then readio when :stderr then stderrio else raise "BUG: invalid mode must be checked before here: '#{m}'" end end m = Mutex.new m.lock thread = thread_create :child_process_callback do m.lock # run after plugin thread get pid, thread instance and i/o m.unlock begin block.call(*io_objects) rescue EOFError => e log.debug "Process exit and I/O closed", title: title, pid: pid, command: command, arguments: arguments rescue IOError => e if e. == 'stream closed' log.debug "Process I/O stream closed", title: title, pid: pid, command: command, arguments: arguments else log.error "Unexpected I/O error for child process", title: title, pid: pid, command: command, arguments: arguments, error: e end rescue => e log.warn "Unexpected error while processing I/O for child process", title: title, pid: pid, command: command, error: e end process_info = @_child_process_mutex.synchronize do process_info = @_child_process_processes[pid] @_child_process_processes.delete(pid) process_info end child_process_kill(process_info, force: true) if process_info && process_info.alive && ::Thread.current[:_fluentd_plugin_helper_child_process_running] end thread[:_fluentd_plugin_helper_child_process_running] = true thread[:_fluentd_plugin_helper_child_process_pid] = pid pinfo = ProcessInfo.new(title, thread, pid, readio, readio_in_use, writeio, writeio_in_use, stderrio, stderrio_in_use, wait_thread, true, nil) @_child_process_mutex.synchronize do @_child_process_processes[pid] = pinfo end m.unlock pid end |
#child_process_exit_status ⇒ Object
52 53 54 |
# File 'lib/fluent/plugin_helper/child_process.rb', line 52 def child_process_exit_status ::Thread.current[:_fluentd_plugin_helper_child_process_exit_status] end |
#child_process_id ⇒ Object
48 49 50 |
# File 'lib/fluent/plugin_helper/child_process.rb', line 48 def child_process_id ::Thread.current[:_fluentd_plugin_helper_child_process_pid] end |
#child_process_kill(process_info, force: false) ⇒ Object
160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 |
# File 'lib/fluent/plugin_helper/child_process.rb', line 160 def child_process_kill(process_info, force: false) if !process_info || !process_info.alive return end process_info.killed_at = Time.now unless force begin pid, status = Process.waitpid2(process_info.pid, Process::WNOHANG) if pid && status process_info.thread[:_fluentd_plugin_helper_child_process_exit_status] = status process_info.alive = false end rescue Errno::ECHILD, Errno::ESRCH, Errno::EPERM process_info.alive = false rescue # ignore end if !process_info.alive return end begin signal = (Fluent.windows? || force) ? :KILL : :TERM Process.kill(signal, process_info.pid) if force process_info.alive = false end rescue Errno::ECHILD, Errno::ESRCH process_info.alive = false end end |
#child_process_running? ⇒ Boolean
43 44 45 46 |
# File 'lib/fluent/plugin_helper/child_process.rb', line 43 def child_process_running? # checker for code in callback of child_process_execute ::Thread.current[:_fluentd_plugin_helper_child_process_running] || false end |
#close ⇒ Object
138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 |
# File 'lib/fluent/plugin_helper/child_process.rb', line 138 def close while (pids = @_child_process_mutex.synchronize{ @_child_process_processes.keys }).size > 0 pids.each do |pid| process_info = @_child_process_processes[pid] if !process_info || !process_info.alive @_child_process_mutex.synchronize{ @_child_process_processes.delete(pid) } next end process_info.killed_at ||= Time.now # for illegular case (e.g., created after shutdown) next if Time.now < process_info.killed_at + @_child_process_kill_timeout child_process_kill(process_info, force: true) @_child_process_mutex.synchronize{ @_child_process_processes.delete(pid) } end sleep CHILD_PROCESS_LOOP_CHECK_INTERVAL end super end |
#initialize ⇒ Object
98 99 100 101 102 103 104 |
# File 'lib/fluent/plugin_helper/child_process.rb', line 98 def initialize super # plugins MAY configure this parameter @_child_process_exit_timeout = CHILD_PROCESS_DEFAULT_EXIT_TIMEOUT @_child_process_kill_timeout = CHILD_PROCESS_DEFAULT_KILL_TIMEOUT @_child_process_mutex = Mutex.new end |
#shutdown ⇒ Object
120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 |
# File 'lib/fluent/plugin_helper/child_process.rb', line 120 def shutdown @_child_process_mutex.synchronize{ @_child_process_processes.keys }.each do |pid| process_info = @_child_process_processes[pid] next if !process_info || !process_info.writeio_in_use begin Timeout.timeout(@_child_process_exit_timeout) do process_info.writeio.close end rescue Timeout::Error log.debug "External process #{process_info.title} doesn't exist after STDIN close in timeout #{@_child_process_exit_timeout}sec" end child_process_kill(process_info) end super end |
#start ⇒ Object
106 107 108 109 |
# File 'lib/fluent/plugin_helper/child_process.rb', line 106 def start super @_child_process_processes = {} # pid => ProcessInfo end |
#stop ⇒ Object
111 112 113 114 115 116 117 118 |
# File 'lib/fluent/plugin_helper/child_process.rb', line 111 def stop @_child_process_mutex.synchronize{ @_child_process_processes.keys }.each do |pid| process_info = @_child_process_processes[pid] if process_info process_info.thread[:_fluentd_plugin_helper_child_process_running] = false end end end |