Class: LogStash::Inputs::Pipe

Inherits:
Base
  • Object
show all
Defined in:
lib/logstash/inputs/pipe.rb

Overview

Stream events from a long running command pipe.

By default, each event is assumed to be one line. If you want to join lines, you’ll want to use the multiline codec.

Instance Method Summary collapse

Constructor Details

#initialize(params) ⇒ Pipe

Returns a new instance of Pipe.



35
36
37
38
# File 'lib/logstash/inputs/pipe.rb', line 35

def initialize(params)
  super
  @pipe = nil
end

Instance Method Details

#registerObject



41
42
43
44
45
46
47
48
# File 'lib/logstash/inputs/pipe.rb', line 41

def register
  @logger.debug("Registering pipe input", :command => @command)

  @hostname = Socket.gethostname.freeze

  @host_name_field =            ecs_select[disabled: 'host',    v1: '[host][name]']
  @process_command_line_field = ecs_select[disabled: 'command', v1: '[process][command_line]']
end

#run(queue) ⇒ Object



51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/logstash/inputs/pipe.rb', line 51

def run(queue)
  while !stop?
    begin
      pipe = @pipe = IO.popen(@command, "r")

      pipe.each do |line|
        line = line.chomp
        @logger.debug? && @logger.debug("Received line", :command => @command, :line => line)

        @codec.decode(line) do |event|
          decorate(event)
          event.set(@host_name_field, @hostname) unless event.include?(@host_name_field)
          event.set(@process_command_line_field, @command) unless event.include?(@process_command_line_field)
          queue << event
        end
      end
      pipe.close
      @pipe = nil
    rescue Exception => e
      @logger.error("Exception while running command", :exception => e, :backtrace => e.backtrace)
    end

    # Keep running the command forever.
    Stud.stoppable_sleep(10) do
      stop?
    end
  end
end

#stopObject

def run



80
81
82
83
84
85
86
87
# File 'lib/logstash/inputs/pipe.rb', line 80

def stop
  pipe = @pipe
  if pipe
    Process.kill("KILL", pipe.pid) rescue nil
    pipe.close rescue nil
    @pipe = nil
  end
end