Class: Fluent::TailInput
- Inherits:
-
Input
- Object
- Input
- Fluent::TailInput
show all
- Defined in:
- lib/fluent/plugin/in_tail.rb
Overview
This TailInput is for existence plugins which extends old in_tail This class will be removed after release v1.
Defined Under Namespace
Classes: FilePositionEntry, MemoryPositionEntry, PositionFile, TailWatcher
Constant Summary
Configurable::CONFIG_TYPE_REGISTRY
Instance Attribute Summary collapse
Attributes inherited from Input
#router
#log
Instance Method Summary
collapse
included
Methods included from PluginId
#plugin_id
#config, included, lookup_type, register_type
Constructor Details
Returns a new instance of TailInput.
820
821
822
823
|
# File 'lib/fluent/plugin/in_tail.rb', line 820
def initialize
super
@paths = []
end
|
Instance Attribute Details
#paths ⇒ Object
Returns the value of attribute paths.
830
831
832
|
# File 'lib/fluent/plugin/in_tail.rb', line 830
def paths
@paths
end
|
Instance Method Details
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
|
# File 'lib/fluent/plugin/in_tail.rb', line 832
def configure(conf)
super
@paths = @path.split(',').map {|path| path.strip }
if @paths.empty?
raise ConfigError, "tail: 'path' parameter is required on tail input"
end
unless @pos_file
$log.warn "'pos_file PATH' parameter is not set to a 'tail' source."
$log.warn "this parameter is highly recommended to save the position to resume tailing."
end
configure_parser(conf)
end
|
848
849
850
851
|
# File 'lib/fluent/plugin/in_tail.rb', line 848
def configure_parser(conf)
@parser = TextParser.new
@parser.configure(conf)
end
|
#parse_line(line) ⇒ Object
915
916
917
|
# File 'lib/fluent/plugin/in_tail.rb', line 915
def parse_line(line)
return @parser.parse(line)
end
|
#receive_lines(lines) ⇒ Object
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
|
# File 'lib/fluent/plugin/in_tail.rb', line 889
def receive_lines(lines)
es = MultiEventStream.new
lines.each {|line|
begin
line.chomp! time, record = parse_line(line)
if time && record
es.add(time, record)
else
log.warn "pattern not match: #{line.inspect}"
end
rescue
log.warn line.dump, :error=>$!.to_s
log.debug_backtrace
end
}
unless es.empty?
begin
router.emit_stream(@tag, es)
rescue
end
end
end
|
#run ⇒ Object
882
883
884
885
886
887
|
# File 'lib/fluent/plugin/in_tail.rb', line 882
def run
@loop.run
rescue
log.error "unexpected error", :error=>$!.to_s
log.error_backtrace
end
|
#shutdown ⇒ Object
873
874
875
876
877
878
879
880
|
# File 'lib/fluent/plugin/in_tail.rb', line 873
def shutdown
@tails.each {|tail|
tail.close
}
@loop.stop
@thread.join
@pf_file.close if @pf_file
end
|
#start ⇒ Object
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
|
# File 'lib/fluent/plugin/in_tail.rb', line 853
def start
if @pos_file
@pf_file = File.open(@pos_file, File::RDWR|File::CREAT, DEFAULT_FILE_PERMISSION)
@pf_file.sync = true
@pf = PositionFile.parse(@pf_file)
end
@loop = Coolio::Loop.new
@tails = @paths.map {|path|
pe = @pf ? @pf[path] : MemoryPositionEntry.new
tw = TailWatcher.new(path, @rotate_wait, pe, &method(:receive_lines))
tw.log = log
tw
}
@tails.each {|tail|
tail.attach(@loop)
}
@thread = Thread.new(&method(:run))
end
|