Class: LogStash::Inputs::FileProgress

Inherits:
Base
  • Object
show all
Defined in:
lib/logstash/inputs/file_progress.rb

Defined Under Namespace

Classes: FlushableListener, ListenerTail

Instance Method Summary collapse

Instance Method Details

#begin_tailingObject



298
299
300
301
302
303
304
305
306
# File 'lib/logstash/inputs/file_progress.rb', line 298

def begin_tailing
  # if the pipeline restarts this input,
  # make sure previous files are closed
  stop
  # use observer listener api
  @tail = FileWatch::Ext::Tail.new_observing_progress(@tail_config)
  @tail.logger = @logger
  @path.each { |path| @tail.tail(path) }
end

#listener_for(path) ⇒ Object



293
294
295
296
# File 'lib/logstash/inputs/file_progress.rb', line 293

def listener_for(path)
  # path is the identity
  ListenerTail.new(path, self)
end

#log_line_received(path, line) ⇒ Object



321
322
323
324
# File 'lib/logstash/inputs/file_progress.rb', line 321

def log_line_received(path, line)
  return if !@logger.debug?
  @logger.debug("Received line", :path => path, :text => line)
end

#post_process_this(event) ⇒ Object

def run



315
316
317
318
319
# File 'lib/logstash/inputs/file_progress.rb', line 315

def post_process_this(event)
  event.set("host", @host) if !event.include?("host")
  decorate(event)
  @queue << event
end

#registerObject



178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
# File 'lib/logstash/inputs/file_progress.rb', line 178

def register
  require "addressable/uri"
  require "filewatch/ext/tail"
  require "digest/md5"
  @logger.info("Registering file input", :path => @path)
  @host = Socket.gethostname.force_encoding(Encoding::UTF_8)

  @tail_config = {
    :exclude => @exclude,
    :stat_interval => @stat_interval,
    :discover_interval => @discover_interval,
    :sincedb_write_interval => @sincedb_write_interval,
    :delimiter => @delimiter,
    :ignore_older => @ignore_older,
    :close_older => @close_older,
    :max_open_files => @max_open_files,
    :eof_close => true,
    :progress_write_interval => @progress_write_interval
  }

  @path.each do |path|
    if Pathname.new(path).relative?
      raise ArgumentError.new("File paths must be absolute, relative path specified: #{path}")
    end
  end

  if @sincedb_path.nil?
    if ENV["SINCEDB_DIR"].nil? && ENV["HOME"].nil?
      @logger.error("No SINCEDB_DIR or HOME environment variable set, I don't know where " \
                    "to keep track of the files I'm watching. Either set " \
                    "HOME or SINCEDB_DIR in your environment, or set sincedb_path in " \
                    "in your Logstash config for the file input with " \
                    "path '#{@path.inspect}'")
      raise # TODO(sissel): HOW DO I FAIL PROPERLY YO
    end

    #pick SINCEDB_DIR if available, otherwise use HOME
    sincedb_dir = ENV["SINCEDB_DIR"] || ENV["HOME"]

    # Join by ',' to make it easy for folks to know their own sincedb
    # generated path (vs, say, inspecting the @path array)
    @sincedb_path = File.join(sincedb_dir, ".sincedb_" + Digest::MD5.hexdigest(@path.join(",")))

    # Migrate any old .sincedb to the new file (this is for version <=1.1.1 compatibility)
    old_sincedb = File.join(sincedb_dir, ".sincedb")
    if File.exists?(old_sincedb)
      @logger.info("Renaming old ~/.sincedb to new one", :old => old_sincedb,
                   :new => @sincedb_path)
      File.rename(old_sincedb, @sincedb_path)
    end

    @logger.info("No sincedb_path set, generating one based on the file path",
                 :sincedb_path => @sincedb_path, :path => @path)
  end

  if File.directory?(@sincedb_path)
    raise ArgumentError.new("The \"sincedb_path\" argument must point to a file, received a directory: \"#{@sincedb_path}\"")
  end

  @tail_config[:sincedb_path] = @sincedb_path

  if @start_position == "beginning"
    @tail_config[:start_new_files_at] = :beginning
  end

  @codec = LogStash::Codecs::IdentityMapCodec.new(@codec)
end

#run(queue) ⇒ Object



308
309
310
311
312
313
# File 'lib/logstash/inputs/file_progress.rb', line 308

def run(queue)
  begin_tailing
  @queue = queue
  @tail.subscribe(self)
  exit_flush
end

#stopObject



326
327
328
329
330
331
332
333
334
# File 'lib/logstash/inputs/file_progress.rb', line 326

def stop
  # in filewatch >= 0.6.7, quit will closes and forget all files
  # but it will write their last read positions to since_db
  # beforehand
  if @tail
    @codec.close
    @tail.quit
  end
end