Class: Fluent::Plugin::TailInput::TailWatcher

Inherits:
Object
  • Object
show all
Defined in:
lib/fluent/plugin/in_tail.rb

Defined Under Namespace

Classes: IOHandler, LineBufferTimerFlusher, NullIOHandler, RotateHandler, StatWatcher

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(path, rotate_wait, pe, log, read_from_head, enable_watch_timer, read_lines_limit, update_watcher, line_buffer_timer_flusher, &receive_lines) ⇒ TailWatcher

Returns a new instance of TailWatcher.



396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
# File 'lib/fluent/plugin/in_tail.rb', line 396

def initialize(path, rotate_wait, pe, log, read_from_head, enable_watch_timer, read_lines_limit, update_watcher, line_buffer_timer_flusher, &receive_lines)
  @path = path
  @rotate_wait = rotate_wait
  @pe = pe || MemoryPositionEntry.new
  @read_from_head = read_from_head
  @enable_watch_timer = enable_watch_timer
  @read_lines_limit = read_lines_limit
  @receive_lines = receive_lines
  @update_watcher = update_watcher

  @stat_trigger = StatWatcher.new(path, log, &method(:on_notify))
  @timer_trigger = nil

  @rotate_handler = RotateHandler.new(path, log, &method(:on_rotate))
  @io_handler = nil
  @log = log

  @line_buffer_timer_flusher = line_buffer_timer_flusher
end

Instance Attribute Details

#enable_watch_timerObject (readonly)

Returns the value of attribute enable_watch_timer.



417
418
419
# File 'lib/fluent/plugin/in_tail.rb', line 417

def enable_watch_timer
  @enable_watch_timer
end

#line_bufferObject

Returns the value of attribute line_buffer.



419
420
421
# File 'lib/fluent/plugin/in_tail.rb', line 419

def line_buffer
  @line_buffer
end

#line_buffer_timer_flusherObject

Returns the value of attribute line_buffer_timer_flusher.



419
420
421
# File 'lib/fluent/plugin/in_tail.rb', line 419

def line_buffer_timer_flusher
  @line_buffer_timer_flusher
end

#pathObject (readonly)

Returns the value of attribute path.



416
417
418
# File 'lib/fluent/plugin/in_tail.rb', line 416

def path
  @path
end

#stat_triggerObject (readonly)

Returns the value of attribute stat_trigger.



417
418
419
# File 'lib/fluent/plugin/in_tail.rb', line 417

def stat_trigger
  @stat_trigger
end

#timer_triggerObject

Returns the value of attribute timer_trigger.



418
419
420
# File 'lib/fluent/plugin/in_tail.rb', line 418

def timer_trigger
  @timer_trigger
end

#unwatchedObject

This is used for removing position entry from PositionFile



420
421
422
# File 'lib/fluent/plugin/in_tail.rb', line 420

def unwatched
  @unwatched
end

Instance Method Details

#attach {|_self| ... } ⇒ Object

Yields:

  • (_self)

Yield Parameters:



430
431
432
433
# File 'lib/fluent/plugin/in_tail.rb', line 430

def attach
  on_notify
  yield self
end

#close(close_io = true) ⇒ Object



440
441
442
443
444
445
446
# File 'lib/fluent/plugin/in_tail.rb', line 440

def close(close_io = true)
  if close_io && @io_handler
    @io_handler.on_notify
    @io_handler.close
  end
  detach
end

#detachObject



435
436
437
438
# File 'lib/fluent/plugin/in_tail.rb', line 435

def detach
  @timer_trigger.detach if @enable_watch_timer && @timer_trigger.attached?
  @stat_trigger.detach if @stat_trigger.attached?
end

#on_notifyObject



448
449
450
451
452
453
# File 'lib/fluent/plugin/in_tail.rb', line 448

def on_notify
  @rotate_handler.on_notify if @rotate_handler
  @line_buffer_timer_flusher.on_notify(self) if @line_buffer_timer_flusher
  return unless @io_handler
  @io_handler.on_notify
end

#on_rotate(io) ⇒ Object



455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
# File 'lib/fluent/plugin/in_tail.rb', line 455

def on_rotate(io)
  if @io_handler == nil
    if io
      # first time
      stat = io.stat
      fsize = stat.size
      inode = stat.ino

      last_inode = @pe.read_inode
      if inode == last_inode
        # rotated file has the same inode number with the last file.
        # assuming following situation:
        #   a) file was once renamed and backed, or
        #   b) symlink or hardlink to the same file is recreated
        # in either case, seek to the saved position
        pos = @pe.read_pos
      elsif last_inode != 0
        # this is FilePositionEntry and fluentd once started.
        # read data from the head of the rotated file.
        # logs never duplicate because this file is a rotated new file.
        pos = 0
        @pe.update(inode, pos)
      else
        # this is MemoryPositionEntry or this is the first time fluentd started.
        # seek to the end of the any files.
        # logs may duplicate without this seek because it's not sure the file is
        # existent file or rotated new file.
        pos = @read_from_head ? 0 : fsize
        @pe.update(inode, pos)
      end
      io.seek(pos)

      @io_handler = IOHandler.new(io, @pe, @log, @read_lines_limit, &method(:wrap_receive_lines))
    else
      @io_handler = NullIOHandler.new
    end
  else
    log_msg = "detected rotation of #{@path}"
    log_msg << "; waiting #{@rotate_wait} seconds" if @io_handler.io  # wait rotate_time if previous file is exist
    @log.info log_msg

    if io
      stat = io.stat
      inode = stat.ino
      if inode == @pe.read_inode # truncated
        @pe.update_pos(stat.size)
        io_handler = IOHandler.new(io, @pe, @log, @read_lines_limit, &method(:wrap_receive_lines))
        @io_handler.close
        @io_handler = io_handler
      elsif @io_handler.io.nil? # There is no previous file. Reuse TailWatcher
        @pe.update(inode, io.pos)
        io_handler = IOHandler.new(io, @pe, @log, @read_lines_limit, &method(:wrap_receive_lines))
        @io_handler = io_handler
      else # file is rotated and new file found
        @update_watcher.call(@path, swap_state(@pe))
      end
    else # file is rotated and new file not found
      # Clear RotateHandler to avoid duplicated file watch in same path.
      @rotate_handler = nil
      @update_watcher.call(@path, swap_state(@pe))
    end
  end
end

#swap_state(pe) ⇒ Object



519
520
521
522
523
524
525
526
527
# File 'lib/fluent/plugin/in_tail.rb', line 519

def swap_state(pe)
  # Use MemoryPositionEntry for rotated file temporary
  mpe = MemoryPositionEntry.new
  mpe.update(pe.read_inode, pe.read_pos)
  @pe = mpe
  @io_handler.pe = mpe # Don't re-create IOHandler because IOHandler has an internal buffer.

  pe # This pe will be updated in on_rotate after TailWatcher is initialized
end

#tagObject



422
423
424
# File 'lib/fluent/plugin/in_tail.rb', line 422

def tag
  @parsed_tag ||= @path.tr('/', '.').gsub(/\.+/, '.').gsub(/^\./, '')
end

#wrap_receive_lines(lines) ⇒ Object



426
427
428
# File 'lib/fluent/plugin/in_tail.rb', line 426

def wrap_receive_lines(lines)
  @receive_lines.call(lines, self)
end