Class: Fluent::Plugin::TailInput::TailWatcher

Inherits:
Object
  • Object
show all
Defined in:
lib/fluent/plugin/in_tail.rb

Defined Under Namespace

Classes: FIFO, IOHandler, LineBufferTimerFlusher, NullIOHandler, RotateHandler, StatWatcher

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(path, rotate_wait, pe, log, read_from_head, enable_watch_timer, read_lines_limit, update_watcher, line_buffer_timer_flusher, from_encoding, encoding, open_on_every_update, &receive_lines) ⇒ TailWatcher

Returns a new instance of TailWatcher.



429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
# File 'lib/fluent/plugin/in_tail.rb', line 429

def initialize(path, rotate_wait, pe, log, read_from_head, enable_watch_timer, read_lines_limit, update_watcher, line_buffer_timer_flusher, from_encoding, encoding, open_on_every_update, &receive_lines)
  @path = path
  @rotate_wait = rotate_wait
  @pe = pe || MemoryPositionEntry.new
  @read_from_head = read_from_head
  @enable_watch_timer = enable_watch_timer
  @read_lines_limit = read_lines_limit
  @receive_lines = receive_lines
  @update_watcher = update_watcher

  @stat_trigger = StatWatcher.new(self, &method(:on_notify))
  @timer_trigger = nil

  @rotate_handler = RotateHandler.new(self, &method(:on_rotate))
  @io_handler = nil
  @log = log

  @line_buffer_timer_flusher = line_buffer_timer_flusher
  @from_encoding = from_encoding
  @encoding = encoding
  @open_on_every_update = open_on_every_update
end

Instance Attribute Details

#enable_watch_timerObject (readonly)

Returns the value of attribute enable_watch_timer.



455
456
457
# File 'lib/fluent/plugin/in_tail.rb', line 455

def enable_watch_timer
  @enable_watch_timer
end

#encodingObject (readonly)

Returns the value of attribute encoding.



454
455
456
# File 'lib/fluent/plugin/in_tail.rb', line 454

def encoding
  @encoding
end

#from_encodingObject (readonly)

Returns the value of attribute from_encoding.



454
455
456
# File 'lib/fluent/plugin/in_tail.rb', line 454

def from_encoding
  @from_encoding
end

#line_bufferObject

Returns the value of attribute line_buffer.



457
458
459
# File 'lib/fluent/plugin/in_tail.rb', line 457

def line_buffer
  @line_buffer
end

#line_buffer_timer_flusherObject

Returns the value of attribute line_buffer_timer_flusher.



457
458
459
# File 'lib/fluent/plugin/in_tail.rb', line 457

def line_buffer_timer_flusher
  @line_buffer_timer_flusher
end

#logObject (readonly)

Returns the value of attribute log.



453
454
455
# File 'lib/fluent/plugin/in_tail.rb', line 453

def log
  @log
end

#open_on_every_updateObject (readonly)

Returns the value of attribute open_on_every_update.



453
454
455
# File 'lib/fluent/plugin/in_tail.rb', line 453

def open_on_every_update
  @open_on_every_update
end

#pathObject (readonly)

Returns the value of attribute path.



452
453
454
# File 'lib/fluent/plugin/in_tail.rb', line 452

def path
  @path
end

#peObject (readonly)

Returns the value of attribute pe.



453
454
455
# File 'lib/fluent/plugin/in_tail.rb', line 453

def pe
  @pe
end

#read_lines_limitObject (readonly)

Returns the value of attribute read_lines_limit.



453
454
455
# File 'lib/fluent/plugin/in_tail.rb', line 453

def read_lines_limit
  @read_lines_limit
end

#stat_triggerObject (readonly)

Returns the value of attribute stat_trigger.



455
456
457
# File 'lib/fluent/plugin/in_tail.rb', line 455

def stat_trigger
  @stat_trigger
end

#timer_triggerObject

Returns the value of attribute timer_trigger.



456
457
458
# File 'lib/fluent/plugin/in_tail.rb', line 456

def timer_trigger
  @timer_trigger
end

#unwatchedObject

This is used for removing position entry from PositionFile



458
459
460
# File 'lib/fluent/plugin/in_tail.rb', line 458

def unwatched
  @unwatched
end

Instance Method Details

#attach {|_self| ... } ⇒ Object

Yields:

  • (_self)

Yield Parameters:



468
469
470
471
# File 'lib/fluent/plugin/in_tail.rb', line 468

def attach
  on_notify
  yield self
end

#closeObject



479
480
481
482
483
484
# File 'lib/fluent/plugin/in_tail.rb', line 479

def close
  if @io_handler
    @io_handler.close
    @io_handler = nil
  end
end

#detachObject



473
474
475
476
477
# File 'lib/fluent/plugin/in_tail.rb', line 473

def detach
  @timer_trigger.detach if @enable_watch_timer && @timer_trigger.attached?
  @stat_trigger.detach if @stat_trigger.attached?
  @io_handler.on_notify if @io_handler
end

#on_notifyObject



486
487
488
489
490
491
492
493
494
495
496
497
# File 'lib/fluent/plugin/in_tail.rb', line 486

def on_notify
  begin
    stat = Fluent::FileWrapper.stat(@path)
  rescue Errno::ENOENT
    # moved or deleted
    stat = nil
  end

  @rotate_handler.on_notify(stat) if @rotate_handler
  @line_buffer_timer_flusher.on_notify(self) if @line_buffer_timer_flusher
  @io_handler.on_notify if @io_handler
end

#on_rotate(stat) ⇒ Object



499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
# File 'lib/fluent/plugin/in_tail.rb', line 499

def on_rotate(stat)
  if @io_handler.nil?
    if stat
      # first time
      fsize = stat.size
      inode = stat.ino

      last_inode = @pe.read_inode
      if inode == last_inode
        # rotated file has the same inode number with the last file.
        # assuming following situation:
        #   a) file was once renamed and backed, or
        #   b) symlink or hardlink to the same file is recreated
        # in either case, seek to the saved position
      elsif last_inode != 0
        # this is FilePositionEntry and fluentd once started.
        # read data from the head of the rotated file.
        # logs never duplicate because this file is a rotated new file.
        @pe.update(inode, 0)
      else
        # this is MemoryPositionEntry or this is the first time fluentd started.
        # seek to the end of the any files.
        # logs may duplicate without this seek because it's not sure the file is
        # existent file or rotated new file.
        pos = @read_from_head ? 0 : fsize
        @pe.update(inode, pos)
      end
      @io_handler = IOHandler.new(self, &method(:wrap_receive_lines))
    else
      @io_handler = NullIOHandler.new
    end
  else
    watcher_needs_update = false

    if stat
      inode = stat.ino
      if inode == @pe.read_inode # truncated
        @pe.update_pos(0)
        @io_handler.close
      elsif !@io_handler.opened? # There is no previous file. Reuse TailWatcher
        @pe.update(inode, 0)
      else # file is rotated and new file found
        watcher_needs_update = true
      end
    else # file is rotated and new file not found
      # Clear RotateHandler to avoid duplicated file watch in same path.
      @rotate_handler = nil
      watcher_needs_update = true
    end

    log_msg = "detected rotation of #{@path}"
    log_msg << "; waiting #{@rotate_wait} seconds" if watcher_needs_update # wait rotate_time if previous file exists
    @log.info log_msg

    if watcher_needs_update
      @update_watcher.call(@path, swap_state(@pe))
    else
      @io_handler = IOHandler.new(self, &method(:wrap_receive_lines))
    end
  end
end

#swap_state(pe) ⇒ Object



561
562
563
564
565
566
567
# File 'lib/fluent/plugin/in_tail.rb', line 561

def swap_state(pe)
  # Use MemoryPositionEntry for rotated file temporary
  mpe = MemoryPositionEntry.new
  mpe.update(pe.read_inode, pe.read_pos)
  @pe = mpe
  pe # This pe will be updated in on_rotate after TailWatcher is initialized
end

#tagObject



460
461
462
# File 'lib/fluent/plugin/in_tail.rb', line 460

def tag
  @parsed_tag ||= @path.tr('/', '.').gsub(/\.+/, '.').gsub(/^\./, '')
end

#wrap_receive_lines(lines) ⇒ Object



464
465
466
# File 'lib/fluent/plugin/in_tail.rb', line 464

def wrap_receive_lines(lines)
  @receive_lines.call(lines, self)
end