Class: LogStash::Inputs::Exec

Inherits:
Base
  • Object
show all
Defined in:
lib/logstash/inputs/exec.rb

Overview

Periodically run a shell command and capture the whole output as an event.

Notes:

  • The ‘command` field of this event will be the command run.

  • The ‘message` field of this event will be the entire stdout of the command.

Instance Method Summary collapse

Instance Method Details

#execute(queue) ⇒ Object

Execute a given command

Parameters:

  • A (String)

    command string

  • A (Array or Queue)

    queue to append events to



66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
# File 'lib/logstash/inputs/exec.rb', line 66

def execute(queue)
  start = Time.now
  output = exit_status = nil
  begin
    @logger.debug? && @logger.debug("Running exec", :command => @command)
    output, exit_status = run_command()
  rescue StandardError => e
    @logger.error("Error while running command",
      :command => @command, :e => e, :backtrace => e.backtrace)
  rescue Exception => e
    @logger.error("Exception while running command",
      :command => @command, :e => e, :backtrace => e.backtrace)
  end
  duration = Time.now - start
  @logger.debug? && @logger.debug("Command completed", :command => @command, :duration => duration)
  if output
    @codec.decode(output) do |event|
      decorate(event)
      event.set("host", @hostname)
      event.set("command", @command)
      event.set("[@metadata][duration]", duration)
      event.set("[@metadata][exit_status]", exit_status)
      queue << event
    end
  end
  duration
end

#registerObject



33
34
35
36
37
38
39
40
41
# File 'lib/logstash/inputs/exec.rb', line 33

def register
  @logger.info("Registering Exec Input", :type => @type, :command => @command, :interval => @interval, :schedule => @schedule)
  @hostname = Socket.gethostname
  @io       = nil
  
  if (@interval.nil? && @schedule.nil?) || (@interval && @schedule)
    raise LogStash::ConfigurationError, "exec input: either 'interval' or 'schedule' option must be defined."
  end
end

#run(queue) ⇒ Object

def register



43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/logstash/inputs/exec.rb', line 43

def run(queue)
  if @schedule
    @scheduler = Rufus::Scheduler.new(:max_work_threads => 1)
    @scheduler.cron @schedule do
      execute(queue)
    end
    @scheduler.join
  else
    while !stop?
      duration = execute(queue)
      wait_until_end_of_interval(duration)
    end # loop
  end
end

#stopObject

def run



58
59
60
61
# File 'lib/logstash/inputs/exec.rb', line 58

def stop
  close_io()
  @scheduler.shutdown(:wait) if @scheduler
end