Class: Fluent::WatchProcessInput
- Inherits:
-
Input
- Object
- Input
- Fluent::WatchProcessInput
- Defined in:
- lib/fluent/plugin/in_watch_process.rb
Defined Under Namespace
Modules: OS
Constant Summary collapse
- DEFAULT_KEYS =
%w(start_time user pid parent_pid cpu_time cpu_percent memory_percent mem_rss mem_size state proc_name command)- DEFAULT_TYPES =
%w(pid:integer parent_pid:integer cpu_percent:float memory_percent:float mem_rss:integer mem_size:integer)- Converters =
{ 'string' => lambda { |v| v.to_s }, 'integer' => lambda { |v| v.to_i }, 'float' => lambda { |v| v.to_f }, 'bool' => lambda { |v| case v.downcase when 'true', 'yes', '1' true else false end }, 'time' => lambda { |v, time_parser| time_parser.parse(v) }, 'array' => lambda { |v, delimiter| v.to_s.split(delimiter) } }
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #get_placeholder ⇒ Object
- #get_ps_command ⇒ Object
-
#initialize ⇒ WatchProcessInput
constructor
A new instance of WatchProcessInput.
- #run ⇒ Object
- #shutdown ⇒ Object
- #start ⇒ Object
Constructor Details
#initialize ⇒ WatchProcessInput
Returns a new instance of WatchProcessInput.
36 37 38 39 |
# File 'lib/fluent/plugin/in_watch_process.rb', line 36 def initialize super require 'time' end |
Instance Method Details
#configure(conf) ⇒ Object
41 42 43 44 45 46 47 48 49 50 51 52 |
# File 'lib/fluent/plugin/in_watch_process.rb', line 41 def configure(conf) super @command = @command || get_ps_command @keys = @keys.nil? ? DEFAULT_KEYS : @keys.to_s.gsub(' ', '').split(',') @types = @types || DEFAULT_TYPES @types_map = Hash[types.map{|v| v.split(':')}] @lookup_user = @lookup_user.gsub(' ', '').split(',') unless @lookup_user.nil? @interval = Config.time_value(@interval) @hostname = `#{@hostname_command}`.chomp $log.info "watch_process: polling start. :tag=>#{@tag} :lookup_user=>#{@lookup_user} :interval=>#{@interval} :command=>#{@command}" end |
#get_placeholder ⇒ Object
119 120 121 122 123 124 |
# File 'lib/fluent/plugin/in_watch_process.rb', line 119 def get_placeholder return { '__HOSTNAME__' => @hostname, '${hostname}' => @hostname, } end |
#get_ps_command ⇒ Object
92 93 94 95 96 97 98 |
# File 'lib/fluent/plugin/in_watch_process.rb', line 92 def get_ps_command if OS.linux? "LANG=en_US.UTF-8 && ps -ewwo lstart,user:20,pid,ppid,time,%cpu,%mem,rss,sz,s,comm,cmd" elsif OS.mac? "LANG=en_US.UTF-8 && ps -ewwo lstart,user,pid,ppid,time,%cpu,%mem,rss,vsz,state,comm,command" end end |
#run ⇒ Object
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 |
# File 'lib/fluent/plugin/in_watch_process.rb', line 62 def run loop do io = IO.popen(@command, 'r') io.gets while result = io.gets keys_size = @keys.size if result =~ /(?<lstart>(^\w+ \w+ \d+ \d\d:\d\d:\d\d \d+))/ lstart = Time.parse($~[:lstart]) values = result.sub($~[:lstart], '') keys_size -= 1 end values = values.chomp.strip.split(/\s+/, keys_size) data = Hash[ @keys.zip([lstart.to_s, values].reject(&:empty?).flatten).map do |k,v| v = Converters[@types_map[k]].call(v) if @types_map.include?(k) [k,v] end ] data['elapsed_time'] = (Time.now - Time.parse(data['start_time'])).to_i if data['start_time'] next unless @lookup_user.nil? || @lookup_user.include?(data['user']) tag = @tag.gsub(/(\${[a-z]+}|__[A-Z]+__)/, get_placeholder) Engine.emit(tag, Engine.now, data) end io.close sleep @interval end rescue StandardError => e $log.error "watch_process: error has occured. #{e.message}" end |
#shutdown ⇒ Object
58 59 60 |
# File 'lib/fluent/plugin/in_watch_process.rb', line 58 def shutdown Thread.kill(@thread) end |
#start ⇒ Object
54 55 56 |
# File 'lib/fluent/plugin/in_watch_process.rb', line 54 def start @thread = Thread.new(&method(:run)) end |