Class: Fluent::Plugin::BinInput::TailWatcher

Inherits:
TailInput::TailWatcher
  • Object
show all
Defined in:
lib/fluent/plugin/in_bin.rb

Defined Under Namespace

Classes: IOHandler

Instance Method Summary collapse

Instance Method Details

#on_rotate(io) ⇒ Object



45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# File 'lib/fluent/plugin/in_bin.rb', line 45

def on_rotate(io)
  if @io_handler == nil
    if io
      # first time
      stat = io.stat
      fsize = stat.size
      inode = stat.ino

      last_inode = @pe.read_inode
      if inode == last_inode
        # rotated file has the same inode number with the last file.
        # assuming following situation:
        #   a) file was once renamed and backed, or
        #   b) symlink or hardlink to the same file is recreated
        # in either case, seek to the saved position
        pos = @pe.read_pos
      elsif last_inode != 0
        # this is FilePositionEntry and fluentd once started.
        # read data from the head of the rotated file.
        # logs never duplicate because this file is a rotated new file.
        pos = 0
        @pe.update(inode, pos)
      else
        # this is MemoryPositionEntry or this is the first time fluentd started.
        # seek to the end of the any files.
        # logs may duplicate without this seek because it's not sure the file is
        # existent file or rotated new file.
        pos = @read_from_head ? 0 : fsize
        @pe.update(inode, pos)
      end
      io.seek(pos)

      @io_handler = IOHandler.new(io, @pe, @log, @read_lines_limit, &method(:wrap_receive_lines))
    else
      @io_handler = NullIOHandler.new
    end
  else
    log_msg = "detected rotation of #{@path}"
    log_msg << "; waiting #{@rotate_wait} seconds" if @io_handler.io  # wait rotate_time if previous file is exist
    @log.info log_msg

    if io
      stat = io.stat
      inode = stat.ino
      if inode == @pe.read_inode # truncated
        @pe.update_pos(stat.size)
        io_handler = IOHandler.new(io, @pe, @log, @read_lines_limit, &method(:wrap_receive_lines))
        @io_handler.close
        @io_handler = io_handler
      elsif @io_handler.io.nil? # There is no previous file. Reuse TailWatcher
        @pe.update(inode, io.pos)
        io_handler = IOHandler.new(io, @pe, @log, @read_lines_limit, &method(:wrap_receive_lines))
        @io_handler = io_handler
      else # file is rotated and new file found
        @update_watcher.call(@path, swap_state(@pe))
      end
    else # file is rotated and new file not found
      # Clear RotateHandler to avoid duplicated file watch in same path.
      @rotate_handler = nil
      @update_watcher.call(@path, swap_state(@pe))
    end
  end

  def swap_state(pe)
    # Use MemoryPositionEntry for rotated file temporary
    mpe = TailInput::MemoryPositionEntry.new
    mpe.update(pe.read_inode, pe.read_pos)
    @pe = mpe
    @io_handler.pe = mpe # Don't re-create IOHandler because IOHandler has an internal buffer.

    pe # This pe will be updated in on_rotate after TailWatcher is initialized
  end
end

#swap_state(pe) ⇒ Object



108
109
110
111
112
113
114
115
116
# File 'lib/fluent/plugin/in_bin.rb', line 108

def swap_state(pe)
  # Use MemoryPositionEntry for rotated file temporary
  mpe = TailInput::MemoryPositionEntry.new
  mpe.update(pe.read_inode, pe.read_pos)
  @pe = mpe
  @io_handler.pe = mpe # Don't re-create IOHandler because IOHandler has an internal buffer.

  pe # This pe will be updated in on_rotate after TailWatcher is initialized
end