Class: LogStash::Inputs::Pipe

Inherits:
Base
  • Object
show all
Defined in:
lib/logstash/inputs/pipe.rb

Overview

Stream events from a long running command pipe.

By default, each event is assumed to be one line. If you want to join lines, you’ll want to use the multiline filter.

Instance Method Summary collapse

Constructor Details

#initialize(params) ⇒ Pipe

Returns a new instance of Pipe.



30
31
32
33
# File 'lib/logstash/inputs/pipe.rb', line 30

def initialize(params)
  super
  @pipe = nil
end

Instance Method Details

#registerObject



36
37
38
# File 'lib/logstash/inputs/pipe.rb', line 36

def register
  @logger.info("Registering pipe input", :command => @command)
end

#run(queue) ⇒ Object



41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/logstash/inputs/pipe.rb', line 41

def run(queue)
  while !stop?
    begin
      @pipe = IO.popen(@command, "r")
      hostname = Socket.gethostname

      @pipe.each do |line|
        line = line.chomp
        @logger.debug? && @logger.debug("Received line", :command => @command, :line => line)
        @codec.decode(line) do |event|
          event["host"] = hostname
          event["command"] = @command
          decorate(event)
          queue << event
        end
      end
      @pipe.close
      @pipe = nil
    rescue Exception => e
      @logger.error("Exception while running command", :e => e, :backtrace => e.backtrace)
    end

    # Keep running the command forever.
    Stud.stoppable_sleep(10) do
      stop?
    end
  end
end

#stopObject

def run



70
71
72
73
74
75
76
# File 'lib/logstash/inputs/pipe.rb', line 70

def stop
  if @pipe
    Process.kill("KILL", @pipe.pid) rescue nil
    @pipe.close rescue nil
    @pipe = nil
  end
end