Class: Fluent::WatchProcessInput

Inherits:
Input
  • Object
show all
Defined in:
lib/fluent/plugin/in_watch_process.rb

Defined Under Namespace

Modules: OS

Constant Summary collapse

DEFAULT_KEYS =
%w(start_time user pid parent_pid cpu_time cpu_percent memory_percent mem_rss mem_size state proc_name command)
DEFAULT_TYPES =
%w(pid:integer parent_pid:integer cpu_percent:float memory_percent:float mem_rss:integer mem_size:integer)
Converters =
{
  'string' => lambda { |v| v.to_s },
  'integer' => lambda { |v| v.to_i },
  'float' => lambda { |v| v.to_f },
  'bool' => lambda { |v|
    case v.downcase
    when 'true', 'yes', '1'
      true
    else
      false
    end
  },
  'time' => lambda { |v, time_parser|
    time_parser.parse(v)
  },
  'array' => lambda { |v, delimiter|
    v.to_s.split(delimiter)
  }
}

Instance Method Summary collapse

Constructor Details

#initializeWatchProcessInput

Returns a new instance of WatchProcessInput.



36
37
38
39
# File 'lib/fluent/plugin/in_watch_process.rb', line 36

def initialize
  super
  require 'time'
end

Instance Method Details

#configure(conf) ⇒ Object



41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/fluent/plugin/in_watch_process.rb', line 41

def configure(conf)
  super

  @command = @command || get_ps_command
  @keys = @keys.nil? ? DEFAULT_KEYS : @keys.to_s.gsub(' ', '').split(',')
  @types = @types || DEFAULT_TYPES
  @types_map = Hash[types.map{|v| v.split(':')}]
  @lookup_user = @lookup_user.gsub(' ', '').split(',') unless @lookup_user.nil?
  @interval = Config.time_value(@interval)
  @hostname = `#{@hostname_command}`.chomp
  $log.info "watch_process: polling start. :tag=>#{@tag} :lookup_user=>#{@lookup_user} :interval=>#{@interval} :command=>#{@command}"
end

#get_placeholderObject



119
120
121
122
123
124
# File 'lib/fluent/plugin/in_watch_process.rb', line 119

def get_placeholder
  return {
    '__HOSTNAME__' => @hostname,
    '${hostname}' => @hostname,
  }
end

#get_ps_commandObject



92
93
94
95
96
97
98
# File 'lib/fluent/plugin/in_watch_process.rb', line 92

def get_ps_command
  if OS.linux?
    "LANG=en_US.UTF-8 && ps -ewwo lstart,user:20,pid,ppid,time,%cpu,%mem,rss,sz,s,comm,cmd"
  elsif OS.mac?
    "LANG=en_US.UTF-8 && ps -ewwo lstart,user,pid,ppid,time,%cpu,%mem,rss,vsz,state,comm,command"
  end
end

#runObject



62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# File 'lib/fluent/plugin/in_watch_process.rb', line 62

def run
  loop do
    io = IO.popen(@command, 'r')
    io.gets
    while result = io.gets
      keys_size = @keys.size
      if result =~ /(?<lstart>(^\w+ \w+ \d+ \d\d:\d\d:\d\d \d+))/
        lstart = Time.parse($~[:lstart])
        values = result.sub($~[:lstart], '')
        keys_size -= 1
      end
      values = values.chomp.strip.split(/\s+/, keys_size)
      data = Hash[
        @keys.zip([lstart.to_s, values].reject(&:empty?).flatten).map do |k,v|
          v = Converters[@types_map[k]].call(v) if @types_map.include?(k)
          [k,v]
        end
      ]
      data['elapsed_time'] = (Time.now - Time.parse(data['start_time'])).to_i if data['start_time']
      next unless @lookup_user.nil? || @lookup_user.include?(data['user'])
      tag = @tag.gsub(/(\${[a-z]+}|__[A-Z]+__)/, get_placeholder)
      Engine.emit(tag, Engine.now, data)
    end
    io.close
    sleep @interval
  end
  rescue StandardError => e
    $log.error "watch_process: error has occured. #{e.message}"
end

#shutdownObject



58
59
60
# File 'lib/fluent/plugin/in_watch_process.rb', line 58

def shutdown
  Thread.kill(@thread)
end

#startObject



54
55
56
# File 'lib/fluent/plugin/in_watch_process.rb', line 54

def start
  @thread = Thread.new(&method(:run))
end