Class: Fluent::Plugin::WatchObjectspaceInput

Inherits:
Input
  • Object
show all
Defined in:
lib/fluent/plugin/in_watch_objectspace.rb

Instance Method Summary collapse

Instance Method Details

#configure(conf) ⇒ Object



40
41
42
43
44
45
46
47
48
49
50
# File 'lib/fluent/plugin/in_watch_objectspace.rb', line 40

def configure(conf)
  super(conf)
  if @modules
    @modules.each do |mod|
      require mod
    end
  end
  @warmup_time = Time.now + @watch_delay
  @source = {}
  GC::Profiler.enable
end

#parse_top_result(content) ⇒ Object



57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/fluent/plugin/in_watch_objectspace.rb', line 57

def parse_top_result(content)
  record = {}
  fields = content.split("\n")[-2].split
  values = content.split("\n").last.split
  fields.each_with_index do |field, index|
    case field
    when "USER", "S", "TIME+", "COMMAND"
      record[field.downcase] = values[index]
    when "PID", "PR", "NI", "VIRT", "RES", "SHR"
      record[field.downcase] = values[index].to_i
    else
      record[field.downcase] = values[index].to_f
    end
  end
  record
end

#refresh_watchersObject



74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/fluent/plugin/in_watch_objectspace.rb', line 74

def refresh_watchers
  return if Time.now < @warmup_time
  
  pid = Process.pid
  record = {
    "pid" => pid,
    "count" => {},
    "memory_leaks" => false
  }

  begin
    content = IO.popen("top -p #{pid} -b -n 1") do |io|
      io.read
    end
    record.merge!(parse_top_result(content))

    if @gc_raw_data
      record["raw_data"] = GC::Profiler.raw_data
    end
    @watch_class.each do |klass|
      record["count"]["#{klass.downcase}"] = ObjectSpace.each_object(Object.const_get(klass)) { |x| x }
    end

    if @source.empty?
      @source = record
    end

    if @source["res"] * @res_incremental_threshold_rate < record["res"]
      record["memory_leaks"] = true
      message = sprintf("Memory leak is detected, threshold rate <%f>: %f > %f * %f",
                        @res_incremental_threshold_rate, record["res"],
                        @source["res"], @res_incremental_threshold_rate)
      raise message
    end
    es = OneEventStream.new(Fluent::EventTime.now, record)
    router.emit_stream(@tag, es)
  rescue => e
    $log.error(e.message)
  end
end

#shutdownObject



115
116
# File 'lib/fluent/plugin/in_watch_objectspace.rb', line 115

def shutdown
end

#startObject



52
53
54
55
# File 'lib/fluent/plugin/in_watch_objectspace.rb', line 52

def start
  super
  timer_execute(:execute_watch_objectspace, @watch_interval, &method(:refresh_watchers))
end