Class: Fluent::Plugin::TailInput::TailWatcher

Inherits:
Object
  • Object
show all
Defined in:
lib/fluent/plugin/in_tail.rb

Defined Under Namespace

Classes: FIFO, IOHandler, LineBufferTimerFlusher, NullIOHandler, RotateHandler

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(target_info, pe, log, read_from_head, follow_inodes, update_watcher, line_buffer_timer_flusher, io_handler_build, metrics) ⇒ TailWatcher

Returns a new instance of TailWatcher.



887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
# File 'lib/fluent/plugin/in_tail.rb', line 887

def initialize(target_info, pe, log, read_from_head, follow_inodes, update_watcher, line_buffer_timer_flusher, io_handler_build, metrics)
  @path = target_info.path
  @ino = target_info.ino
  @pe = pe || MemoryPositionEntry.new
  @read_from_head = read_from_head
  @follow_inodes = follow_inodes
  @update_watcher = update_watcher
  @log = log
  @rotate_handler = RotateHandler.new(log, &method(:on_rotate))
  @line_buffer_timer_flusher = line_buffer_timer_flusher
  @io_handler = nil
  @io_handler_build = io_handler_build
  @metrics = metrics
  @watchers = []
end

Instance Attribute Details

#group_watcherObject

Returns the value of attribute group_watcher.



908
909
910
# File 'lib/fluent/plugin/in_tail.rb', line 908

def group_watcher
  @group_watcher
end

#inoObject (readonly)

Returns the value of attribute ino.



903
904
905
# File 'lib/fluent/plugin/in_tail.rb', line 903

def ino
  @ino
end

#line_buffer_timer_flusherObject (readonly)

Returns the value of attribute line_buffer_timer_flusher.



905
906
907
# File 'lib/fluent/plugin/in_tail.rb', line 905

def line_buffer_timer_flusher
  @line_buffer_timer_flusher
end

#pathObject (readonly)

Returns the value of attribute path.



903
904
905
# File 'lib/fluent/plugin/in_tail.rb', line 903

def path
  @path
end

#peObject (readonly)

Returns the value of attribute pe.



904
905
906
# File 'lib/fluent/plugin/in_tail.rb', line 904

def pe
  @pe
end

#unwatchedObject

This is used for removing position entry from PositionFile



906
907
908
# File 'lib/fluent/plugin/in_tail.rb', line 906

def unwatched
  @unwatched
end

#watchersObject (readonly)

Returns the value of attribute watchers.



907
908
909
# File 'lib/fluent/plugin/in_tail.rb', line 907

def watchers
  @watchers
end

Instance Method Details

#closeObject



926
927
928
929
930
931
# File 'lib/fluent/plugin/in_tail.rb', line 926

def close
  if @io_handler
    @io_handler.close
    @io_handler = nil
  end
end

#detach(shutdown_start_time = nil) ⇒ Object



918
919
920
921
922
923
924
# File 'lib/fluent/plugin/in_tail.rb', line 918

def detach(shutdown_start_time = nil)
  if @io_handler
    @io_handler.ready_to_shutdown(shutdown_start_time)
    @io_handler.on_notify
  end
  @line_buffer_timer_flusher&.close(self)
end

#eof?Boolean

Returns:

  • (Boolean)


933
934
935
# File 'lib/fluent/plugin/in_tail.rb', line 933

def eof?
  @io_handler.nil? || @io_handler.eof?
end

#io_handlerObject



1031
1032
1033
# File 'lib/fluent/plugin/in_tail.rb', line 1031

def io_handler
  @io_handler_build.call(self, @path)
end

#on_notifyObject



937
938
939
940
941
942
943
944
945
946
947
948
# File 'lib/fluent/plugin/in_tail.rb', line 937

def on_notify
  begin
    stat = Fluent::FileWrapper.stat(@path)
  rescue Errno::ENOENT, Errno::EACCES
    # moved or deleted
    stat = nil
  end

  @rotate_handler.on_notify(stat) if @rotate_handler
  @line_buffer_timer_flusher.on_notify(self) if @line_buffer_timer_flusher
  @io_handler.on_notify if @io_handler
end

#on_rotate(stat) ⇒ Object



950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
# File 'lib/fluent/plugin/in_tail.rb', line 950

def on_rotate(stat)
  if @io_handler.nil?
    if stat
      # first time
      fsize = stat.size
      inode = stat.ino

      last_inode = @pe.read_inode
      if inode == last_inode
        # rotated file has the same inode number with the last file.
        # assuming following situation:
        #   a) file was once renamed and backed, or
        #   b) symlink or hardlink to the same file is recreated
        # in either case of a and b, seek to the saved position
        #   c) file was once renamed, truncated and then backed
        # in this case, consider it truncated
        @pe.update(inode, 0) if fsize < @pe.read_pos
      elsif last_inode != 0
        # this is FilePositionEntry and fluentd once started.
        # read data from the head of the rotated file.
        # logs never duplicate because this file is a rotated new file.
        @pe.update(inode, 0)
      else
        # this is MemoryPositionEntry or this is the first time fluentd started.
        # seek to the end of the any files.
        # logs may duplicate without this seek because it's not sure the file is
        # existent file or rotated new file.
        pos = @read_from_head ? 0 : fsize
        @pe.update(inode, pos)
      end
      @io_handler = io_handler
    else
      @io_handler = NullIOHandler.new
    end
  else
    watcher_needs_update = false

    if stat
      inode = stat.ino
      if inode == @pe.read_inode # truncated
        @pe.update_pos(0)
        @io_handler.close
      elsif !@io_handler.opened? # There is no previous file. Reuse TailWatcher
        @pe.update(inode, 0)
      else # file is rotated and new file found
        watcher_needs_update = true
        # Handle the old log file before renewing TailWatcher [fluentd#1055]
        @io_handler.on_notify
      end
    else # file is rotated and new file not found
      # Clear RotateHandler to avoid duplicated file watch in same path.
      @rotate_handler = nil
      watcher_needs_update = true
    end

    if watcher_needs_update
      if @follow_inodes
        # If stat is nil (file not present), NEED to stop and discard this watcher.
        #   When the file is disappeared but is resurrected soon, then `#refresh_watcher`
        #   can't recognize this TailWatcher needs to be stopped.
        #   This can happens when the file is rotated.
        #   If a notify comes before the new file for the path is created during rotation,
        #   then it appears as if the file was resurrected once it disappeared.
        # Don't want to swap state because we need latest read offset in pos file even after rotate_wait
        @update_watcher.call(self, @pe, stat&.ino)
      else
        # Permit to handle if stat is nil (file not present).
        # If a file is mv-ed and a new file is created during
        # calling `#refresh_watchers`s, and `#refresh_watchers` won't run `#start_watchers`
        # and `#stop_watchers()` for the path because `target_paths_hash`
        # always contains the path.
        @update_watcher.call(self, swap_state(@pe), stat&.ino)
      end
    else
      @log.info "detected rotation of #{@path}"
      @io_handler = io_handler
    end
    @metrics.rotated.inc
  end
end

#register_watcher(watcher) ⇒ Object



914
915
916
# File 'lib/fluent/plugin/in_tail.rb', line 914

def register_watcher(watcher)
  @watchers << watcher
end

#swap_state(pe) ⇒ Object



1035
1036
1037
1038
1039
1040
1041
# File 'lib/fluent/plugin/in_tail.rb', line 1035

def swap_state(pe)
  # Use MemoryPositionEntry for rotated file temporary
  mpe = MemoryPositionEntry.new
  mpe.update(pe.read_inode, pe.read_pos)
  @pe = mpe
  pe # This pe will be updated in on_rotate after TailWatcher is initialized
end

#tagObject



910
911
912
# File 'lib/fluent/plugin/in_tail.rb', line 910

def tag
  @parsed_tag ||= @path.tr('/', '.').squeeze('.').gsub(/^\./, '')
end