Class: Fluent::Plugin::TailInput::TailWatcher
- Inherits:
-
Object
- Object
- Fluent::Plugin::TailInput::TailWatcher
- Defined in:
- lib/fluent/plugin/in_tail.rb
Defined Under Namespace
Classes: FIFO, IOHandler, LineBufferTimerFlusher, NullIOHandler, RotateHandler
Instance Attribute Summary collapse
-
#group_watcher ⇒ Object
Returns the value of attribute group_watcher.
-
#ino ⇒ Object
readonly
Returns the value of attribute ino.
-
#line_buffer_timer_flusher ⇒ Object
readonly
Returns the value of attribute line_buffer_timer_flusher.
-
#path ⇒ Object
readonly
Returns the value of attribute path.
-
#pe ⇒ Object
readonly
Returns the value of attribute pe.
-
#unwatched ⇒ Object
This is used for removing position entry from PositionFile.
-
#watchers ⇒ Object
readonly
Returns the value of attribute watchers.
Instance Method Summary collapse
- #close ⇒ Object
- #detach(shutdown_start_time = nil) ⇒ Object
- #eof? ⇒ Boolean
-
#initialize(target_info, pe, log, read_from_head, follow_inodes, update_watcher, line_buffer_timer_flusher, io_handler_build, metrics) ⇒ TailWatcher
constructor
A new instance of TailWatcher.
- #io_handler ⇒ Object
- #on_notify ⇒ Object
- #on_rotate(stat) ⇒ Object
- #register_watcher(watcher) ⇒ Object
- #swap_state(pe) ⇒ Object
- #tag ⇒ Object
Constructor Details
#initialize(target_info, pe, log, read_from_head, follow_inodes, update_watcher, line_buffer_timer_flusher, io_handler_build, metrics) ⇒ TailWatcher
Returns a new instance of TailWatcher.
887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 |
# File 'lib/fluent/plugin/in_tail.rb', line 887 def initialize(target_info, pe, log, read_from_head, follow_inodes, update_watcher, line_buffer_timer_flusher, io_handler_build, metrics) @path = target_info.path @ino = target_info.ino @pe = pe || MemoryPositionEntry.new @read_from_head = read_from_head @follow_inodes = follow_inodes @update_watcher = update_watcher @log = log @rotate_handler = RotateHandler.new(log, &method(:on_rotate)) @line_buffer_timer_flusher = line_buffer_timer_flusher @io_handler = nil @io_handler_build = io_handler_build @metrics = metrics @watchers = [] end |
Instance Attribute Details
#group_watcher ⇒ Object
Returns the value of attribute group_watcher.
908 909 910 |
# File 'lib/fluent/plugin/in_tail.rb', line 908 def group_watcher @group_watcher end |
#ino ⇒ Object (readonly)
Returns the value of attribute ino.
903 904 905 |
# File 'lib/fluent/plugin/in_tail.rb', line 903 def ino @ino end |
#line_buffer_timer_flusher ⇒ Object (readonly)
Returns the value of attribute line_buffer_timer_flusher.
905 906 907 |
# File 'lib/fluent/plugin/in_tail.rb', line 905 def line_buffer_timer_flusher @line_buffer_timer_flusher end |
#path ⇒ Object (readonly)
Returns the value of attribute path.
903 904 905 |
# File 'lib/fluent/plugin/in_tail.rb', line 903 def path @path end |
#pe ⇒ Object (readonly)
Returns the value of attribute pe.
904 905 906 |
# File 'lib/fluent/plugin/in_tail.rb', line 904 def pe @pe end |
#unwatched ⇒ Object
This is used for removing position entry from PositionFile
906 907 908 |
# File 'lib/fluent/plugin/in_tail.rb', line 906 def unwatched @unwatched end |
#watchers ⇒ Object (readonly)
Returns the value of attribute watchers.
907 908 909 |
# File 'lib/fluent/plugin/in_tail.rb', line 907 def watchers @watchers end |
Instance Method Details
#close ⇒ Object
926 927 928 929 930 931 |
# File 'lib/fluent/plugin/in_tail.rb', line 926 def close if @io_handler @io_handler.close @io_handler = nil end end |
#detach(shutdown_start_time = nil) ⇒ Object
918 919 920 921 922 923 924 |
# File 'lib/fluent/plugin/in_tail.rb', line 918 def detach(shutdown_start_time = nil) if @io_handler @io_handler.ready_to_shutdown(shutdown_start_time) @io_handler.on_notify end @line_buffer_timer_flusher&.close(self) end |
#eof? ⇒ Boolean
933 934 935 |
# File 'lib/fluent/plugin/in_tail.rb', line 933 def eof? @io_handler.nil? || @io_handler.eof? end |
#io_handler ⇒ Object
1031 1032 1033 |
# File 'lib/fluent/plugin/in_tail.rb', line 1031 def io_handler @io_handler_build.call(self, @path) end |
#on_notify ⇒ Object
937 938 939 940 941 942 943 944 945 946 947 948 |
# File 'lib/fluent/plugin/in_tail.rb', line 937 def on_notify begin stat = Fluent::FileWrapper.stat(@path) rescue Errno::ENOENT, Errno::EACCES # moved or deleted stat = nil end @rotate_handler.on_notify(stat) if @rotate_handler @line_buffer_timer_flusher.on_notify(self) if @line_buffer_timer_flusher @io_handler.on_notify if @io_handler end |
#on_rotate(stat) ⇒ Object
950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 |
# File 'lib/fluent/plugin/in_tail.rb', line 950 def on_rotate(stat) if @io_handler.nil? if stat # first time fsize = stat.size inode = stat.ino last_inode = @pe.read_inode if inode == last_inode # rotated file has the same inode number with the last file. # assuming following situation: # a) file was once renamed and backed, or # b) symlink or hardlink to the same file is recreated # in either case of a and b, seek to the saved position # c) file was once renamed, truncated and then backed # in this case, consider it truncated @pe.update(inode, 0) if fsize < @pe.read_pos elsif last_inode != 0 # this is FilePositionEntry and fluentd once started. # read data from the head of the rotated file. # logs never duplicate because this file is a rotated new file. @pe.update(inode, 0) else # this is MemoryPositionEntry or this is the first time fluentd started. # seek to the end of the any files. # logs may duplicate without this seek because it's not sure the file is # existent file or rotated new file. pos = @read_from_head ? 0 : fsize @pe.update(inode, pos) end @io_handler = io_handler else @io_handler = NullIOHandler.new end else watcher_needs_update = false if stat inode = stat.ino if inode == @pe.read_inode # truncated @pe.update_pos(0) @io_handler.close elsif !@io_handler.opened? # There is no previous file. Reuse TailWatcher @pe.update(inode, 0) else # file is rotated and new file found watcher_needs_update = true # Handle the old log file before renewing TailWatcher [fluentd#1055] @io_handler.on_notify end else # file is rotated and new file not found # Clear RotateHandler to avoid duplicated file watch in same path. @rotate_handler = nil watcher_needs_update = true end if watcher_needs_update if @follow_inodes # If stat is nil (file not present), NEED to stop and discard this watcher. # When the file is disappeared but is resurrected soon, then `#refresh_watcher` # can't recognize this TailWatcher needs to be stopped. # This can happens when the file is rotated. # If a notify comes before the new file for the path is created during rotation, # then it appears as if the file was resurrected once it disappeared. # Don't want to swap state because we need latest read offset in pos file even after rotate_wait @update_watcher.call(self, @pe, stat&.ino) else # Permit to handle if stat is nil (file not present). # If a file is mv-ed and a new file is created during # calling `#refresh_watchers`s, and `#refresh_watchers` won't run `#start_watchers` # and `#stop_watchers()` for the path because `target_paths_hash` # always contains the path. @update_watcher.call(self, swap_state(@pe), stat&.ino) end else @log.info "detected rotation of #{@path}" @io_handler = io_handler end @metrics.rotated.inc end end |
#register_watcher(watcher) ⇒ Object
914 915 916 |
# File 'lib/fluent/plugin/in_tail.rb', line 914 def register_watcher(watcher) @watchers << watcher end |
#swap_state(pe) ⇒ Object
1035 1036 1037 1038 1039 1040 1041 |
# File 'lib/fluent/plugin/in_tail.rb', line 1035 def swap_state(pe) # Use MemoryPositionEntry for rotated file temporary mpe = MemoryPositionEntry.new mpe.update(pe.read_inode, pe.read_pos) @pe = mpe pe # This pe will be updated in on_rotate after TailWatcher is initialized end |
#tag ⇒ Object
910 911 912 |
# File 'lib/fluent/plugin/in_tail.rb', line 910 def tag @parsed_tag ||= @path.tr('/', '.').squeeze('.').gsub(/^\./, '') end |