Class: Fluent::Plugin::TailInput::TailWatcher

Inherits:
Object
  • Object
show all
Defined in:
lib/fluent/plugin/in_tail.rb

Defined Under Namespace

Classes: FIFO, IOHandler, LineBufferTimerFlusher, NullIOHandler, RotateHandler, StatWatcher

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(path, rotate_wait, pe, log, read_from_head, enable_watch_timer, read_lines_limit, update_watcher, line_buffer_timer_flusher, from_encoding, encoding, open_on_every_update, &receive_lines) ⇒ TailWatcher

Returns a new instance of TailWatcher.



420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
# File 'lib/fluent/plugin/in_tail.rb', line 420

def initialize(path, rotate_wait, pe, log, read_from_head, enable_watch_timer, read_lines_limit, update_watcher, line_buffer_timer_flusher, from_encoding, encoding, open_on_every_update, &receive_lines)
  @path = path
  @rotate_wait = rotate_wait
  @pe = pe || MemoryPositionEntry.new
  @read_from_head = read_from_head
  @enable_watch_timer = enable_watch_timer
  @read_lines_limit = read_lines_limit
  @receive_lines = receive_lines
  @update_watcher = update_watcher

  @stat_trigger = StatWatcher.new(self, &method(:on_notify))
  @timer_trigger = nil

  @rotate_handler = RotateHandler.new(self, &method(:on_rotate))
  @io_handler = nil
  @log = log

  @line_buffer_timer_flusher = line_buffer_timer_flusher
  @from_encoding = from_encoding
  @encoding = encoding
  @open_on_every_update = open_on_every_update
end

Instance Attribute Details

#enable_watch_timerObject (readonly)

Returns the value of attribute enable_watch_timer.



446
447
448
# File 'lib/fluent/plugin/in_tail.rb', line 446

def enable_watch_timer
  @enable_watch_timer
end

#encodingObject (readonly)

Returns the value of attribute encoding.



445
446
447
# File 'lib/fluent/plugin/in_tail.rb', line 445

def encoding
  @encoding
end

#from_encodingObject (readonly)

Returns the value of attribute from_encoding.



445
446
447
# File 'lib/fluent/plugin/in_tail.rb', line 445

def from_encoding
  @from_encoding
end

#line_bufferObject

Returns the value of attribute line_buffer.



448
449
450
# File 'lib/fluent/plugin/in_tail.rb', line 448

def line_buffer
  @line_buffer
end

#line_buffer_timer_flusherObject

Returns the value of attribute line_buffer_timer_flusher.



448
449
450
# File 'lib/fluent/plugin/in_tail.rb', line 448

def line_buffer_timer_flusher
  @line_buffer_timer_flusher
end

#logObject (readonly)

Returns the value of attribute log.



444
445
446
# File 'lib/fluent/plugin/in_tail.rb', line 444

def log
  @log
end

#open_on_every_updateObject (readonly)

Returns the value of attribute open_on_every_update.



444
445
446
# File 'lib/fluent/plugin/in_tail.rb', line 444

def open_on_every_update
  @open_on_every_update
end

#pathObject (readonly)

Returns the value of attribute path.



443
444
445
# File 'lib/fluent/plugin/in_tail.rb', line 443

def path
  @path
end

#peObject (readonly)

Returns the value of attribute pe.



444
445
446
# File 'lib/fluent/plugin/in_tail.rb', line 444

def pe
  @pe
end

#read_lines_limitObject (readonly)

Returns the value of attribute read_lines_limit.



444
445
446
# File 'lib/fluent/plugin/in_tail.rb', line 444

def read_lines_limit
  @read_lines_limit
end

#stat_triggerObject (readonly)

Returns the value of attribute stat_trigger.



446
447
448
# File 'lib/fluent/plugin/in_tail.rb', line 446

def stat_trigger
  @stat_trigger
end

#timer_triggerObject

Returns the value of attribute timer_trigger.



447
448
449
# File 'lib/fluent/plugin/in_tail.rb', line 447

def timer_trigger
  @timer_trigger
end

#unwatchedObject

This is used for removing position entry from PositionFile



449
450
451
# File 'lib/fluent/plugin/in_tail.rb', line 449

def unwatched
  @unwatched
end

Instance Method Details

#attach {|_self| ... } ⇒ Object

Yields:

  • (_self)

Yield Parameters:



459
460
461
462
# File 'lib/fluent/plugin/in_tail.rb', line 459

def attach
  on_notify
  yield self
end

#closeObject



470
471
472
473
474
475
# File 'lib/fluent/plugin/in_tail.rb', line 470

def close
  if @io_handler
    @io_handler.close
    @io_handler = nil
  end
end

#detachObject



464
465
466
467
468
# File 'lib/fluent/plugin/in_tail.rb', line 464

def detach
  @timer_trigger.detach if @enable_watch_timer && @timer_trigger.attached?
  @stat_trigger.detach if @stat_trigger.attached?
  @io_handler.on_notify if @io_handler
end

#on_notifyObject



477
478
479
480
481
482
483
484
485
486
487
488
# File 'lib/fluent/plugin/in_tail.rb', line 477

def on_notify
  begin
    stat = Fluent::FileWrapper.stat(@path)
  rescue Errno::ENOENT
    # moved or deleted
    stat = nil
  end

  @rotate_handler.on_notify(stat) if @rotate_handler
  @line_buffer_timer_flusher.on_notify(self) if @line_buffer_timer_flusher
  @io_handler.on_notify if @io_handler
end

#on_rotate(stat) ⇒ Object



490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
# File 'lib/fluent/plugin/in_tail.rb', line 490

def on_rotate(stat)
  if @io_handler.nil?
    if stat
      # first time
      fsize = stat.size
      inode = stat.ino

      last_inode = @pe.read_inode
      if inode == last_inode
        # rotated file has the same inode number with the last file.
        # assuming following situation:
        #   a) file was once renamed and backed, or
        #   b) symlink or hardlink to the same file is recreated
        # in either case, seek to the saved position
      elsif last_inode != 0
        # this is FilePositionEntry and fluentd once started.
        # read data from the head of the rotated file.
        # logs never duplicate because this file is a rotated new file.
        @pe.update(inode, 0)
      else
        # this is MemoryPositionEntry or this is the first time fluentd started.
        # seek to the end of the any files.
        # logs may duplicate without this seek because it's not sure the file is
        # existent file or rotated new file.
        pos = @read_from_head ? 0 : fsize
        @pe.update(inode, pos)
      end
      @io_handler = IOHandler.new(self, &method(:wrap_receive_lines))
    else
      @io_handler = NullIOHandler.new
    end
  else
    watcher_needs_update = false

    if stat
      inode = stat.ino
      if inode == @pe.read_inode # truncated
        @pe.update_pos(0)
        @io_handler.close
      elsif !@io_handler.opened? # There is no previous file. Reuse TailWatcher
        @pe.update(inode, 0)
      else # file is rotated and new file found
        watcher_needs_update = true
      end
    else # file is rotated and new file not found
      # Clear RotateHandler to avoid duplicated file watch in same path.
      @rotate_handler = nil
      watcher_needs_update = true
    end

    log_msg = "detected rotation of #{@path}"
    log_msg << "; waiting #{@rotate_wait} seconds" if watcher_needs_update # wait rotate_time if previous file exists
    @log.info log_msg

    if watcher_needs_update
      @update_watcher.call(@path, swap_state(@pe))
    else
      @io_handler = IOHandler.new(self, &method(:wrap_receive_lines))
    end
  end
end

#swap_state(pe) ⇒ Object



552
553
554
555
556
557
558
# File 'lib/fluent/plugin/in_tail.rb', line 552

def swap_state(pe)
  # Use MemoryPositionEntry for rotated file temporary
  mpe = MemoryPositionEntry.new
  mpe.update(pe.read_inode, pe.read_pos)
  @pe = mpe
  pe # This pe will be updated in on_rotate after TailWatcher is initialized
end

#tagObject



451
452
453
# File 'lib/fluent/plugin/in_tail.rb', line 451

def tag
  @parsed_tag ||= @path.tr('/', '.').gsub(/\.+/, '.').gsub(/^\./, '')
end

#wrap_receive_lines(lines) ⇒ Object



455
456
457
# File 'lib/fluent/plugin/in_tail.rb', line 455

def wrap_receive_lines(lines)
  @receive_lines.call(lines, self)
end