Class: Fluent::TailInput

Inherits:
Input
  • Object
show all
Defined in:
lib/fluent/plugin/in_tail.rb

Overview

This TailInput is for existence plugins which extends old in_tail This class will be removed after release v1.

Defined Under Namespace

Classes: FilePositionEntry, MemoryPositionEntry, PositionFile, TailWatcher

Constant Summary

Constants included from Configurable

Configurable::CONFIG_TYPE_REGISTRY

Instance Attribute Summary collapse

Attributes inherited from Input

#router

Attributes included from PluginLoggerMixin

#log

Instance Method Summary collapse

Methods included from PluginLoggerMixin

included

Methods included from PluginId

#plugin_id

Methods included from Configurable

#config, included, lookup_type, register_type

Constructor Details

#initializeTailInput

Returns a new instance of TailInput.



820
821
822
823
# File 'lib/fluent/plugin/in_tail.rb', line 820

def initialize
  super
  @paths = []
end

Instance Attribute Details

#pathsObject (readonly)

Returns the value of attribute paths.



830
831
832
# File 'lib/fluent/plugin/in_tail.rb', line 830

def paths
  @paths
end

Instance Method Details

#configure(conf) ⇒ Object



832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
# File 'lib/fluent/plugin/in_tail.rb', line 832

def configure(conf)
  super

  @paths = @path.split(',').map {|path| path.strip }
  if @paths.empty?
    raise ConfigError, "tail: 'path' parameter is required on tail input"
  end

  unless @pos_file
    $log.warn "'pos_file PATH' parameter is not set to a 'tail' source."
    $log.warn "this parameter is highly recommended to save the position to resume tailing."
  end

  configure_parser(conf)
end

#configure_parser(conf) ⇒ Object



848
849
850
851
# File 'lib/fluent/plugin/in_tail.rb', line 848

def configure_parser(conf)
  @parser = TextParser.new
  @parser.configure(conf)
end

#parse_line(line) ⇒ Object



915
916
917
# File 'lib/fluent/plugin/in_tail.rb', line 915

def parse_line(line)
  return @parser.parse(line)
end

#receive_lines(lines) ⇒ Object



889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
# File 'lib/fluent/plugin/in_tail.rb', line 889

def receive_lines(lines)
  es = MultiEventStream.new
  lines.each {|line|
    begin
      line.chomp!  # remove \n
      time, record = parse_line(line)
      if time && record
        es.add(time, record)
      else
        log.warn "pattern not match: #{line.inspect}"
      end
    rescue
      log.warn line.dump, :error=>$!.to_s
      log.debug_backtrace
    end
  }

  unless es.empty?
    begin
      router.emit_stream(@tag, es)
    rescue
      # ignore errors. Engine shows logs and backtraces.
    end
  end
end

#runObject



882
883
884
885
886
887
# File 'lib/fluent/plugin/in_tail.rb', line 882

def run
  @loop.run
rescue
  log.error "unexpected error", :error=>$!.to_s
  log.error_backtrace
end

#shutdownObject



873
874
875
876
877
878
879
880
# File 'lib/fluent/plugin/in_tail.rb', line 873

def shutdown
  @tails.each {|tail|
    tail.close
  }
  @loop.stop
  @thread.join
  @pf_file.close if @pf_file
end

#startObject



853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
# File 'lib/fluent/plugin/in_tail.rb', line 853

def start
  if @pos_file
    @pf_file = File.open(@pos_file, File::RDWR|File::CREAT, DEFAULT_FILE_PERMISSION)
    @pf_file.sync = true
    @pf = PositionFile.parse(@pf_file)
  end

  @loop = Coolio::Loop.new
  @tails = @paths.map {|path|
    pe = @pf ? @pf[path] : MemoryPositionEntry.new
    tw = TailWatcher.new(path, @rotate_wait, pe, &method(:receive_lines))
    tw.log = log
    tw
  }
  @tails.each {|tail|
    tail.attach(@loop)
  }
  @thread = Thread.new(&method(:run))
end