Class: Fluent::Plugin::WatchProcessInput

Inherits:
Input
  • Object
show all
Includes:
HandleTagNameMixin, Mixin::RewriteTagName, Mixin::TypeConverter
Defined in:
lib/fluent/plugin/in_watch_process.rb

Defined Under Namespace

Classes: WindowsWatcher

Constant Summary collapse

DEFAULT_KEYS =
%w(start_time user pid parent_pid cpu_time cpu_percent memory_percent mem_rss mem_size state proc_name command)
DEFAULT_TYPES =
%w(
  pid:integer
  parent_pid:integer
  cpu_percent:float
  memory_percent:float
  mem_rss:integer
  mem_size:integer
).join(",")

Instance Method Summary collapse

Constructor Details

#initializeWatchProcessInput

Returns a new instance of WatchProcessInput.



35
36
37
# File 'lib/fluent/plugin/in_watch_process.rb', line 35

def initialize
  super
end

Instance Method Details

#apply_default_typesObject



58
59
60
61
62
# File 'lib/fluent/plugin/in_watch_process.rb', line 58

def apply_default_types
  return unless @types.nil?
  @types = Fluent.windows? ? @windows_watcher.default_types : DEFAULT_TYPES
  @type_converters = parse_types_parameter unless @types.nil?
end

#configure(conf) ⇒ Object



39
40
41
42
43
44
45
46
47
# File 'lib/fluent/plugin/in_watch_process.rb', line 39

def configure(conf)
  super

  @windows_watcher = WindowsWatcher.new(@keys, @command, @lookup_user, @powershell_command) if Fluent.windows?
  @keys ||= Fluent.windows? ? @windows_watcher.keys : DEFAULT_KEYS
  @command ||= get_ps_command
  apply_default_types
  log.info "watch_process: polling start. :tag=>#{@tag} :lookup_user=>#{@lookup_user} :interval=>#{@interval} :command=>#{@command}"
end

#get_ps_commandObject



106
107
108
109
110
111
112
113
114
# File 'lib/fluent/plugin/in_watch_process.rb', line 106

def get_ps_command
  if mac?
    "LANG=en_US.UTF-8 && ps -ewwo lstart,user,pid,ppid,time,%cpu,%mem,rss,vsz,state,comm,command"
  elsif Fluent.windows?
    @windows_watcher.command
  else
    "LANG=en_US.UTF-8 && ps -ewwo lstart,user:20,pid,ppid,time,%cpu,%mem,rss,sz,s,comm,cmd"
  end
end

#mac?Boolean

Returns:

  • (Boolean)


116
117
118
# File 'lib/fluent/plugin/in_watch_process.rb', line 116

def mac?
  (/darwin/ =~ RUBY_PLATFORM) != nil
end

#match_look_up_user?(data) ⇒ Boolean

Returns:

  • (Boolean)


100
101
102
103
104
# File 'lib/fluent/plugin/in_watch_process.rb', line 100

def match_look_up_user?(data)
  return true if @lookup_user.nil?

  @lookup_user.include?(data['user'])
end

#on_timerObject



64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/fluent/plugin/in_watch_process.rb', line 64

def on_timer
  io = IO.popen(@command, 'r')
  begin
    io.gets
    while result = io.gets
      if Fluent.windows?
        data = @windows_watcher.parse_line(result)
        next unless @windows_watcher.match_look_up_user?(data)
      else
        data = parse_line(result)
        next unless match_look_up_user?(data)
      end
      emit_tag = tag.dup
      filter_record(emit_tag, Fluent::Engine.now, data)
      router.emit(emit_tag, Fluent::Engine.now, data)
    end
  ensure
    io.close
  end
rescue StandardError => e
  log.error "watch_process: error has occured. #{e.message}"
end

#parse_line(line) ⇒ Object



87
88
89
90
91
92
93
94
95
96
97
98
# File 'lib/fluent/plugin/in_watch_process.rb', line 87

def parse_line(line)
  keys_size = @keys.size
  if line =~ /(?<lstart>(^\w+\s+\w+\s+\d+\s+\d\d:\d\d:\d\d \d+))/
    lstart = Time.parse($~[:lstart])
    line = line.sub($~[:lstart], '')
    keys_size -= 1
  end
  values = [lstart.to_s, line.chomp.strip.split(/\s+/, keys_size)]
  data = Hash[@keys.zip(values.reject(&:empty?).flatten)]
  data['elapsed_time'] = (Time.now - Time.parse(data['start_time'])).to_i if data['start_time']
  data
end

#shutdownObject



54
55
56
# File 'lib/fluent/plugin/in_watch_process.rb', line 54

def shutdown
  super
end

#startObject



49
50
51
52
# File 'lib/fluent/plugin/in_watch_process.rb', line 49

def start
  super
  timer_execute(:in_watch_process, @interval, &method(:on_timer))
end