Class: LogStash::Inputs::FileProgress
- Inherits:
-
Base
- Object
- Base
- LogStash::Inputs::FileProgress
- Defined in:
- lib/logstash/inputs/file_progress.rb
Overview
Stream events from files.
By default, each event is assumed to be one line. If you want to join lines, you’ll want to use the multiline filter.
Files are followed in a manner similar to “tail -0F”. File rotation is detected and handled by this input.
In addition to ‘normal’ file input we add events with sincedb-data to the pipe. This can be later used to serve progress date with e.g. faye-output
Instance Method Summary collapse
Instance Method Details
#register ⇒ Object
80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 |
# File 'lib/logstash/inputs/file_progress.rb', line 80 def register require "addressable/uri" require "filewatch/ext/filetail" require "digest/md5" @logger.info("Registering file input", :path => @path) @tail_config = { :exclude => @exclude, :stat_interval => @stat_interval, :discover_interval => @discover_interval, :sincedb_write_interval => @sincedb_write_interval, :logger => @logger, :progressdb => @progressdb, :progressdb_del => @progressdb_del, :eof_close => @eof_close, } @path.each do |path| if Pathname.new(path).relative? raise ArgumentError.new("File paths must be absolute, relative path specified: #{path}") end end @tail_config[:sincedb_path] = @sincedb_path if @start_position == "beginning" @tail_config[:start_new_files_at] = :beginning end @codec_plain = LogStash::Codecs::Plain.new end |
#run(queue) ⇒ Object
113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 |
# File 'lib/logstash/inputs/file_progress.rb', line 113 def run(queue) @tail = FileWatch::Ext::FileTail.new(@tail_config) @tail.logger = @logger @path.each { |path| @tail.tail(path) } hostname = Socket.gethostname @tail.subscribe do |path, data, type| @logger.debug("Received line", :path => path, :data => data) if logger.debug? if type == :log @codec.decode(data) do |event| decorate(event) event["host"] = hostname event["path"] = path queue << event end elsif type == :progressdb @codec_plain.decode(data) do |event| decorate(event) event["host"] = hostname event["path"] = path event["type"] = "progressdb"; queue << event end end # if end # subscribe finished end |
#teardown ⇒ Object
147 148 149 150 |
# File 'lib/logstash/inputs/file_progress.rb', line 147 def teardown @tail.sincedb_write @tail.quit end |