Class: Fluent::ExecInput

Inherits:
Input
  • Object
show all
Defined in:
lib/fluent/plugin/in_exec.rb

Constant Summary

Constants included from Configurable

Configurable::CONFIG_TYPE_REGISTRY

Instance Attribute Summary

Attributes inherited from Input

#router

Attributes included from PluginLoggerMixin

#log

Instance Method Summary collapse

Methods included from PluginLoggerMixin

included

Methods included from PluginId

#plugin_id

Methods included from Configurable

#config, included, lookup_type, register_type

Constructor Details

#initializeExecInput

Returns a new instance of ExecInput.



28
29
30
31
# File 'lib/fluent/plugin/in_exec.rb', line 28

def initialize
  super
  require 'fluent/plugin/exec_util'
end

Instance Method Details

#configure(conf) ⇒ Object



52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# File 'lib/fluent/plugin/in_exec.rb', line 52

def configure(conf)
  super

  if localtime = conf['localtime']
    @localtime = true
  elsif utc = conf['utc']
    @localtime = false
  end

  if conf['timezone']
    @timezone = conf['timezone']
    Fluent::Timezone.validate!(@timezone)
  end

  if !@tag && !@tag_key
    raise ConfigError, "'tag' or 'tag_key' option is required on exec input"
  end

  if @time_key
    if @time_format
      f = @time_format
      @time_parse_proc = Proc.new {|str| Time.strptime(str, f).to_i }
    else
      @time_parse_proc = Proc.new {|str| str.to_i }
    end
  end

  @parser = setup_parser(conf)
end

#runObject



132
133
134
# File 'lib/fluent/plugin/in_exec.rb', line 132

def run
  @parser.call(@io)
end

#run_periodicObject



136
137
138
139
140
141
142
143
144
145
146
147
148
149
# File 'lib/fluent/plugin/in_exec.rb', line 136

def run_periodic
  sleep @run_interval
  until @finished
    begin
      io = IO.popen(@command, "r")
      @parser.call(io)
      Process.waitpid(io.pid)
      sleep @run_interval
    rescue
      log.error "exec failed to run or shutdown child process", error: $!.to_s, error_class: $!.class.to_s
      log.warn_backtrace $!.backtrace
    end
  end
end

#setup_parser(conf) ⇒ Object



82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# File 'lib/fluent/plugin/in_exec.rb', line 82

def setup_parser(conf)
  case @format
  when 'tsv'
    if @keys.empty?
      raise ConfigError, "keys option is required on exec input for tsv format"
    end
    ExecUtil::TSVParser.new(@keys, method(:on_message))
  when 'json'
    ExecUtil::JSONParser.new(method(:on_message))
  when 'msgpack'
    ExecUtil::MessagePackParser.new(method(:on_message))
  else
    ExecUtil::TextParserWrapperParser.new(conf, method(:on_message))
  end
end

#shutdownObject



109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
# File 'lib/fluent/plugin/in_exec.rb', line 109

def shutdown
  if @run_interval
    @finished = true
    # call Thread#run which interupts sleep in order to stop run_periodic thread immediately.
    @thread.run
    @thread.join
  else
    begin
      Process.kill(:TERM, @pid)
    rescue #Errno::ECHILD, Errno::ESRCH, Errno::EPERM
    end
    if @thread.join(60)  # TODO wait time
      return
    end

    begin
      Process.kill(:KILL, @pid)
    rescue #Errno::ECHILD, Errno::ESRCH, Errno::EPERM
    end
    @thread.join
  end
end

#startObject



98
99
100
101
102
103
104
105
106
107
# File 'lib/fluent/plugin/in_exec.rb', line 98

def start
  if @run_interval
    @finished = false
    @thread = Thread.new(&method(:run_periodic))
  else
    @io = IO.popen(@command, "r")
    @pid = @io.pid
    @thread = Thread.new(&method(:run))
  end
end