Class: Fluent::Plugin::ProcInfoInput

Inherits:
Input
  • Object
show all
Defined in:
lib/fluent/plugin/in_proc_info.rb

Constant Summary collapse

NAME =
'proc_info'
DEFAULT_TAG =
NAME
DEFAULT_INTERVAL =
60
DEFAULT_PROBES =
[].freeze
DEFAULT_TIMESTAMP_KEY =
'timestamp'
DEFAULT_TIMESTAMP_FORMAT =
:iso

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#eventerObject (readonly)

Returns the value of attribute eventer.



64
65
66
# File 'lib/fluent/plugin/in_proc_info.rb', line 64

def eventer
  @eventer
end

#info_processesObject (readonly)

Returns the value of attribute info_processes.



64
65
66
# File 'lib/fluent/plugin/in_proc_info.rb', line 64

def info_processes
  @info_processes
end

Instance Method Details

#configure(conf) ⇒ Object

Raises:

  • (Fluent::ConfigError)


66
67
68
69
70
71
72
73
74
75
# File 'lib/fluent/plugin/in_proc_info.rb', line 66

def configure(conf)
  super

  raise Fluent::ConfigError, "#{NAME}: `tag` must be specified" if tag.nil? || tag.empty?
  raise Fluent::ConfigError, "#{NAME}: `interval` must >= 1" if interval < 1
  raise Fluent::ConfigError, "#{NAME}: `probes` must not be empty" if probes.empty?

  configure_eventer
  configure_processes
end

#configure_eventerObject



77
78
79
80
81
82
83
84
85
86
87
88
89
# File 'lib/fluent/plugin/in_proc_info.rb', line 77

def configure_eventer
  if timestamp_key.empty?
    raise Fluent::ConfigError,
          "#{NAME}: `timestamp_key` must not be empty"
  end

  @eventer = ProcInfo::Eventer.new(
    timestamp_key: timestamp_key,
    timestamp_format: timestamp_format,
    event_prefix: event_prefix,
    metadata_prefix: 
  )
end

#configure_processesObject

Raises:

  • (Fluent::ConfigError)


91
92
93
94
95
96
97
98
99
100
# File 'lib/fluent/plugin/in_proc_info.rb', line 91

def configure_processes
  raise Fluent::ConfigError, "#{NAME}: no `process` definition found" if processes.empty?

  @info_processes = processes.map do |process|
    ProcInfo::Process.new(pid: process.pid, pid_file: process.pid_file, eventer: eventer,
                          logger: log)
  rescue StandardError => e
    raise Fluent::ConfigError, "#{NAME}: #{e}"
  end
end

#emit_events(events) ⇒ Object



120
121
122
123
124
125
126
127
128
129
130
# File 'lib/fluent/plugin/in_proc_info.rb', line 120

def emit_events(events)
  return if events.nil? || events.empty?

  time = Fluent::Engine.now
  mes = MultiEventStream.new
  events.each do |event|
    event = reformat_event(event)
    mes.add(time, event)
  end
  router.emit_stream(tag, mes)
end

#multi_workers_ready?Boolean

Returns:

  • (Boolean)


102
103
104
# File 'lib/fluent/plugin/in_proc_info.rb', line 102

def multi_workers_ready?
  true
end

#reformat_event(event) ⇒ Object



132
133
134
135
136
137
138
# File 'lib/fluent/plugin/in_proc_info.rb', line 132

def reformat_event(event)
  # FIXME
  # - prefix data
  # - prefix metadata
  # - timestamp
  event
end

#runObject



113
114
115
116
117
118
# File 'lib/fluent/plugin/in_proc_info.rb', line 113

def run
  info_processes.each do |info_process|
    events = info_process.probe(probes: probes)
    emit_events(events)
  end
end

#startObject



106
107
108
109
110
111
# File 'lib/fluent/plugin/in_proc_info.rb', line 106

def start
  super

  timer_execute(:run_first, 1, repeat: false, &method(:run)) if interval > 60
  timer_execute(:run_interval, interval, repeat: true, &method(:run))
end