Class: Fluent::WatchProcessInput
- Inherits:
-
Input
- Object
- Input
- Fluent::WatchProcessInput
- Includes:
- HandleTagNameMixin, Mixin::RewriteTagName, Mixin::TypeConverter
- Defined in:
- lib/fluent/plugin/in_watch_process.rb
Defined Under Namespace
Modules: OS
Constant Summary collapse
- DEFAULT_KEYS =
%w(start_time user pid parent_pid cpu_time cpu_percent memory_percent mem_rss mem_size state proc_name command)- DEFAULT_TYPES =
"pid:integer,parent_pid:integer,cpu_percent:float,memory_percent:float,mem_rss:integer,mem_size:integer"
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #get_ps_command ⇒ Object
-
#initialize ⇒ WatchProcessInput
constructor
A new instance of WatchProcessInput.
- #run ⇒ Object
- #shutdown ⇒ Object
- #start ⇒ Object
Constructor Details
#initialize ⇒ WatchProcessInput
Returns a new instance of WatchProcessInput.
24 25 26 27 |
# File 'lib/fluent/plugin/in_watch_process.rb', line 24 def initialize super require 'time' end |
Instance Method Details
#configure(conf) ⇒ Object
29 30 31 32 33 34 35 36 37 |
# File 'lib/fluent/plugin/in_watch_process.rb', line 29 def configure(conf) super @command = @command || get_ps_command @keys = @keys.nil? ? DEFAULT_KEYS : @keys.to_s.gsub(' ', '').split(',') @lookup_user = @lookup_user.gsub(' ', '').split(',') unless @lookup_user.nil? @interval = Config.time_value(@interval) log.info "watch_process: polling start. :tag=>#{@tag} :lookup_user=>#{@lookup_user} :interval=>#{@interval} :command=>#{@command}" end |
#get_ps_command ⇒ Object
73 74 75 76 77 78 79 |
# File 'lib/fluent/plugin/in_watch_process.rb', line 73 def get_ps_command if OS.linux? "LANG=en_US.UTF-8 && ps -ewwo lstart,user:20,pid,ppid,time,%cpu,%mem,rss,sz,s,comm,cmd" elsif OS.mac? "LANG=en_US.UTF-8 && ps -ewwo lstart,user,pid,ppid,time,%cpu,%mem,rss,vsz,state,comm,command" end end |
#run ⇒ Object
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 |
# File 'lib/fluent/plugin/in_watch_process.rb', line 47 def run loop do io = IO.popen(@command, 'r') io.gets while result = io.gets keys_size = @keys.size if result =~ /(?<lstart>(^\w+\s+\w+\s+\d+\s+\d\d:\d\d:\d\d \d+))/ lstart = Time.parse($~[:lstart]) result = result.sub($~[:lstart], '') keys_size -= 1 end values = [lstart.to_s, result.chomp.strip.split(/\s+/, keys_size)] data = Hash[@keys.zip(values.reject(&:empty?).flatten)] data['elapsed_time'] = (Time.now - Time.parse(data['start_time'])).to_i if data['start_time'] next unless @lookup_user.nil? || @lookup_user.include?(data['user']) emit_tag = tag.dup filter_record(emit_tag, Fluent::Engine.now, data) router.emit(emit_tag, Fluent::Engine.now, data) end io.close sleep @interval end rescue StandardError => e log.error "watch_process: error has occured. #{e.message}" end |
#shutdown ⇒ Object
43 44 45 |
# File 'lib/fluent/plugin/in_watch_process.rb', line 43 def shutdown Thread.kill(@thread) end |
#start ⇒ Object
39 40 41 |
# File 'lib/fluent/plugin/in_watch_process.rb', line 39 def start @thread = Thread.new(&method(:run)) end |