Class: Fluent::Plugin::PrometheusTailMonitorInput

Inherits:
Input
  • Object
show all
Includes:
PrometheusLabelParser
Defined in:
lib/fluent/plugin/in_prometheus_tail_monitor.rb

Constant Summary collapse

MONITOR_IVARS =
[
  :tails,
]

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from PrometheusLabelParser

#parse_labels_elements

Constructor Details

#initializePrometheusTailMonitorInput

Returns a new instance of PrometheusTailMonitorInput.



19
20
21
22
# File 'lib/fluent/plugin/in_prometheus_tail_monitor.rb', line 19

def initialize
  super
  @registry = ::Prometheus::Client.registry
end

Instance Attribute Details

#registryObject (readonly)

Returns the value of attribute registry.



13
14
15
# File 'lib/fluent/plugin/in_prometheus_tail_monitor.rb', line 13

def registry
  @registry
end

Instance Method Details

#configure(conf) ⇒ Object



28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/fluent/plugin/in_prometheus_tail_monitor.rb', line 28

def configure(conf)
  super
  hostname = Socket.gethostname
  expander_builder = Fluent::Plugin::Prometheus.placeholder_expander(log)
  expander = expander_builder.build({ 'hostname' => hostname, 'worker_id' => fluentd_worker_id })
  @base_labels = parse_labels_elements(conf)
  @base_labels.each do |key, value|
    unless value.is_a?(String)
      raise Fluent::ConfigError, "record accessor syntax is not available in prometheus_tail_monitor"
    end
    @base_labels[key] = expander.expand(value)
  end

  @monitor_agent = Fluent::Plugin::MonitorAgentInput.new
end

#get_gauge(name, docstring) ⇒ Object



109
110
111
112
113
114
115
# File 'lib/fluent/plugin/in_prometheus_tail_monitor.rb', line 109

def get_gauge(name, docstring)
  if @registry.exist?(name)
    @registry.get(name)
  else
    @registry.gauge(name, docstring: docstring, labels: @base_labels.keys + [:plugin_id, :type, :path])
  end
end

#labels(plugin_info, path) ⇒ Object



101
102
103
104
105
106
107
# File 'lib/fluent/plugin/in_prometheus_tail_monitor.rb', line 101

def labels(plugin_info, path)
  @base_labels.merge(
    plugin_id: plugin_info["plugin_id"],
    type: plugin_info["type"],
    path: path,
  )
end

#multi_workers_ready?Boolean

Returns:

  • (Boolean)


24
25
26
# File 'lib/fluent/plugin/in_prometheus_tail_monitor.rb', line 24

def multi_workers_ready?
  true
end

#startObject



44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/fluent/plugin/in_prometheus_tail_monitor.rb', line 44

def start
  super

  @metrics = {
    position: get_gauge(
      :fluentd_tail_file_position,
      'Current position of file.'),
    inode: get_gauge(
      :fluentd_tail_file_inode,
      'Current inode of file.'),
    closed_file_metrics: get_gauge(
      :fluentd_tail_file_closed,
      'Number of files closed.'),
    opened_file_metrics: get_gauge(
      :fluentd_tail_file_opened,
      'Number of files opened.'),
    rotated_file_metrics: get_gauge(
      :fluentd_tail_file_rotated,
      'Number of files rotated.'),
    throttled_file_metrics: get_gauge(
      :fluentd_tail_file_throttled,
      'Number of times files got throttled.'),
  }
  timer_execute(:in_prometheus_tail_monitor, @interval, &method(:update_monitor_info))
end

#update_monitor_infoObject



70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# File 'lib/fluent/plugin/in_prometheus_tail_monitor.rb', line 70

def update_monitor_info
  opts = {
    ivars: MONITOR_IVARS,
  }

  agent_info = @monitor_agent.plugins_info_all(opts).select {|info|
    info['type'] == 'tail'.freeze
  }

  agent_info.each do |info|
    tails = info['instance_variables'][:tails]
    next if tails.nil?

    tails.clone.each do |_, watcher|
      # Access to internal variable of internal class...
      # Very fragile implementation
      pe = watcher.instance_variable_get(:@pe)
      monitor_info = watcher.instance_variable_get(:@metrics)
      label = labels(info, watcher.path)
      @metrics[:inode].set(pe.read_inode, labels: label)
      @metrics[:position].set(pe.read_pos, labels: label)
      unless monitor_info.nil?
        @metrics[:closed_file_metrics].set(monitor_info.closed.get, labels: label)
        @metrics[:opened_file_metrics].set(monitor_info.opened.get, labels: label)
        @metrics[:rotated_file_metrics].set(monitor_info.rotated.get, labels: label)
        @metrics[:throttled_file_metrics].set(monitor_info.throttled.get, labels: label) if monitor_info.members.include?(:throttled)
      end
    end
  end
end