Class: Fluent::TailPathInput::TailWatcher

Inherits:
Object
  • Object
show all
Defined in:
lib/fluent/plugin/in_tailpath.rb

Defined Under Namespace

Classes: IOHandler, NullIOHandler, RotateHandler, RotationRequest, StatWatcher, TimerWatcher

Constant Summary collapse

MAX_LINES_AT_ONCE =
1000

Instance Method Summary collapse

Constructor Details

#initialize(path, rotate_wait, pe, &receive_lines) ⇒ TailWatcher

Returns a new instance of TailWatcher.



120
121
122
123
124
125
126
127
128
129
130
131
132
133
# File 'lib/fluent/plugin/in_tailpath.rb', line 120

def initialize(path, rotate_wait, pe, &receive_lines)
  @path = path
  @rotate_wait = rotate_wait
  @pe = pe || MemoryPositionEntry.new
  @receive_lines = receive_lines

  @rotate_queue = []

  @timer_trigger = TimerWatcher.new(1, true, &method(:on_notify))
  @stat_trigger = StatWatcher.new(path, &method(:on_notify))

  @rotate_handler = RotateHandler.new(path, &method(:on_rotate))
  @io_handler = nil
end

Instance Method Details

#attach(loop) ⇒ Object



135
136
137
138
139
# File 'lib/fluent/plugin/in_tailpath.rb', line 135

def attach(loop)
  @timer_trigger.attach(loop)
  @stat_trigger.attach(loop)
  on_notify
end

#closeObject



146
147
148
149
150
151
152
# File 'lib/fluent/plugin/in_tailpath.rb', line 146

def close
  @rotate_queue.reject! {|req|
    req.io.close
    true
  }
  detach
end

#detachObject



141
142
143
144
# File 'lib/fluent/plugin/in_tailpath.rb', line 141

def detach
  @timer_trigger.detach if @timer_trigger.attached?
  @stat_trigger.detach if @stat_trigger.attached?
end

#on_notifyObject



154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
# File 'lib/fluent/plugin/in_tailpath.rb', line 154

def on_notify
  @rotate_handler.on_notify
  return unless @io_handler
  @io_handler.on_notify

  # proceeds rotate queue
  return if @rotate_queue.empty?
  @rotate_queue.first.tick

  while @rotate_queue.first.ready?
    if io = @rotate_queue.first.io
      stat = io.stat
      inode = stat.ino
      if inode == @pe.read_inode
        # rotated file has the same inode number with the last file.
        # assuming following situation:
        #   a) file was once renamed and backed, or
        #   b) symlink or hardlink to the same file is recreated
        # in either case, seek to the saved position
        pos = @pe.read_pos
      else
        pos = io.pos
      end
      @pe.update(inode, pos)
      io_handler = IOHandler.new(io, @pe, &@receive_lines)
    else
      io_handler = NullIOHandler.new
    end
    @io_handler.close
    @io_handler = io_handler
    @rotate_queue.shift
    break if @rotate_queue.empty?
  end
end

#on_rotate(io) ⇒ Object



189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
# File 'lib/fluent/plugin/in_tailpath.rb', line 189

def on_rotate(io)
  if @io_handler == nil
    if io
      # first time
      stat = io.stat
      fsize = stat.size
      inode = stat.ino

      last_inode = @pe.read_inode
      if inode == last_inode
        # seek to the saved position
        pos = @pe.read_pos
      elsif last_inode != 0
        # this is FilePositionEntry and fluentd once started.
        # read data from the head of the rotated file.
        # logs never duplicate because this file is a rotated new file.
        pos = 0
        @pe.update(inode, pos)
      else
        # this is MemoryPositionEntry or this is the first time fluentd started.
        # seek to the end of the any files.
        # logs may duplicate without this seek because it's not sure the file is
        # existent file or rotated new file.
        pos = fsize
        @pe.update(inode, pos)
      end
      io.seek(pos)

      @io_handler = IOHandler.new(io, @pe, &@receive_lines)
    else
      @io_handler = NullIOHandler.new
    end

  else
    if io && @rotate_queue.find {|req| req.io == io }
      return
    end
    last_io = @rotate_queue.empty? ? @io_handler.io : @rotate_queue.last.io
    if last_io == nil
      $log.info "detected rotation of #{@path}"
      # rotate imeediately if previous file is nil
      wait = 0
    else
      $log.info "detected rotation of #{@path}; waiting #{@rotate_wait} seconds"
      wait = @rotate_wait
      wait -= @rotate_queue.first.wait unless @rotate_queue.empty?
    end
    @rotate_queue << RotationRequest.new(io, wait)
  end
end