Class: Fluent::Plugin::TailInput::TailWatcher
- Inherits:
-
Object
- Object
- Fluent::Plugin::TailInput::TailWatcher
- Defined in:
- lib/fluent/plugin/in_tail.rb
Defined Under Namespace
Classes: FIFO, IOHandler, LineBufferTimerFlusher, NullIOHandler, RotateHandler, StatWatcher
Instance Attribute Summary collapse
-
#enable_watch_timer ⇒ Object
readonly
Returns the value of attribute enable_watch_timer.
-
#encoding ⇒ Object
readonly
Returns the value of attribute encoding.
-
#from_encoding ⇒ Object
readonly
Returns the value of attribute from_encoding.
-
#line_buffer ⇒ Object
Returns the value of attribute line_buffer.
-
#line_buffer_timer_flusher ⇒ Object
Returns the value of attribute line_buffer_timer_flusher.
-
#log ⇒ Object
readonly
Returns the value of attribute log.
-
#open_on_every_update ⇒ Object
readonly
Returns the value of attribute open_on_every_update.
-
#path ⇒ Object
readonly
Returns the value of attribute path.
-
#pe ⇒ Object
readonly
Returns the value of attribute pe.
-
#read_lines_limit ⇒ Object
readonly
Returns the value of attribute read_lines_limit.
-
#stat_trigger ⇒ Object
readonly
Returns the value of attribute stat_trigger.
-
#timer_trigger ⇒ Object
Returns the value of attribute timer_trigger.
-
#unwatched ⇒ Object
This is used for removing position entry from PositionFile.
Instance Method Summary collapse
- #attach {|_self| ... } ⇒ Object
- #close ⇒ Object
- #detach ⇒ Object
-
#initialize(path, rotate_wait, pe, log, read_from_head, enable_watch_timer, read_lines_limit, update_watcher, line_buffer_timer_flusher, from_encoding, encoding, open_on_every_update, &receive_lines) ⇒ TailWatcher
constructor
A new instance of TailWatcher.
- #on_notify ⇒ Object
- #on_rotate(stat) ⇒ Object
- #swap_state(pe) ⇒ Object
- #tag ⇒ Object
- #wrap_receive_lines(lines) ⇒ Object
Constructor Details
#initialize(path, rotate_wait, pe, log, read_from_head, enable_watch_timer, read_lines_limit, update_watcher, line_buffer_timer_flusher, from_encoding, encoding, open_on_every_update, &receive_lines) ⇒ TailWatcher
Returns a new instance of TailWatcher.
420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 |
# File 'lib/fluent/plugin/in_tail.rb', line 420 def initialize(path, rotate_wait, pe, log, read_from_head, enable_watch_timer, read_lines_limit, update_watcher, line_buffer_timer_flusher, from_encoding, encoding, open_on_every_update, &receive_lines) @path = path @rotate_wait = rotate_wait @pe = pe || MemoryPositionEntry.new @read_from_head = read_from_head @enable_watch_timer = enable_watch_timer @read_lines_limit = read_lines_limit @receive_lines = receive_lines @update_watcher = update_watcher @stat_trigger = StatWatcher.new(self, &method(:on_notify)) @timer_trigger = nil @rotate_handler = RotateHandler.new(self, &method(:on_rotate)) @io_handler = nil @log = log @line_buffer_timer_flusher = line_buffer_timer_flusher @from_encoding = from_encoding @encoding = encoding @open_on_every_update = open_on_every_update end |
Instance Attribute Details
#enable_watch_timer ⇒ Object (readonly)
Returns the value of attribute enable_watch_timer.
446 447 448 |
# File 'lib/fluent/plugin/in_tail.rb', line 446 def enable_watch_timer @enable_watch_timer end |
#encoding ⇒ Object (readonly)
Returns the value of attribute encoding.
445 446 447 |
# File 'lib/fluent/plugin/in_tail.rb', line 445 def encoding @encoding end |
#from_encoding ⇒ Object (readonly)
Returns the value of attribute from_encoding.
445 446 447 |
# File 'lib/fluent/plugin/in_tail.rb', line 445 def from_encoding @from_encoding end |
#line_buffer ⇒ Object
Returns the value of attribute line_buffer.
448 449 450 |
# File 'lib/fluent/plugin/in_tail.rb', line 448 def line_buffer @line_buffer end |
#line_buffer_timer_flusher ⇒ Object
Returns the value of attribute line_buffer_timer_flusher.
448 449 450 |
# File 'lib/fluent/plugin/in_tail.rb', line 448 def line_buffer_timer_flusher @line_buffer_timer_flusher end |
#log ⇒ Object (readonly)
Returns the value of attribute log.
444 445 446 |
# File 'lib/fluent/plugin/in_tail.rb', line 444 def log @log end |
#open_on_every_update ⇒ Object (readonly)
Returns the value of attribute open_on_every_update.
444 445 446 |
# File 'lib/fluent/plugin/in_tail.rb', line 444 def open_on_every_update @open_on_every_update end |
#path ⇒ Object (readonly)
Returns the value of attribute path.
443 444 445 |
# File 'lib/fluent/plugin/in_tail.rb', line 443 def path @path end |
#pe ⇒ Object (readonly)
Returns the value of attribute pe.
444 445 446 |
# File 'lib/fluent/plugin/in_tail.rb', line 444 def pe @pe end |
#read_lines_limit ⇒ Object (readonly)
Returns the value of attribute read_lines_limit.
444 445 446 |
# File 'lib/fluent/plugin/in_tail.rb', line 444 def read_lines_limit @read_lines_limit end |
#stat_trigger ⇒ Object (readonly)
Returns the value of attribute stat_trigger.
446 447 448 |
# File 'lib/fluent/plugin/in_tail.rb', line 446 def stat_trigger @stat_trigger end |
#timer_trigger ⇒ Object
Returns the value of attribute timer_trigger.
447 448 449 |
# File 'lib/fluent/plugin/in_tail.rb', line 447 def timer_trigger @timer_trigger end |
#unwatched ⇒ Object
This is used for removing position entry from PositionFile
449 450 451 |
# File 'lib/fluent/plugin/in_tail.rb', line 449 def unwatched @unwatched end |
Instance Method Details
#attach {|_self| ... } ⇒ Object
459 460 461 462 |
# File 'lib/fluent/plugin/in_tail.rb', line 459 def attach on_notify yield self end |
#close ⇒ Object
470 471 472 473 474 475 |
# File 'lib/fluent/plugin/in_tail.rb', line 470 def close if @io_handler @io_handler.close @io_handler = nil end end |
#detach ⇒ Object
464 465 466 467 468 |
# File 'lib/fluent/plugin/in_tail.rb', line 464 def detach @timer_trigger.detach if @enable_watch_timer && @timer_trigger.attached? @stat_trigger.detach if @stat_trigger.attached? @io_handler.on_notify if @io_handler end |
#on_notify ⇒ Object
477 478 479 480 481 482 483 484 485 486 487 488 |
# File 'lib/fluent/plugin/in_tail.rb', line 477 def on_notify begin stat = Fluent::FileWrapper.stat(@path) rescue Errno::ENOENT # moved or deleted stat = nil end @rotate_handler.on_notify(stat) if @rotate_handler @line_buffer_timer_flusher.on_notify(self) if @line_buffer_timer_flusher @io_handler.on_notify if @io_handler end |
#on_rotate(stat) ⇒ Object
490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 |
# File 'lib/fluent/plugin/in_tail.rb', line 490 def on_rotate(stat) if @io_handler.nil? if stat # first time fsize = stat.size inode = stat.ino last_inode = @pe.read_inode if inode == last_inode # rotated file has the same inode number with the last file. # assuming following situation: # a) file was once renamed and backed, or # b) symlink or hardlink to the same file is recreated # in either case, seek to the saved position elsif last_inode != 0 # this is FilePositionEntry and fluentd once started. # read data from the head of the rotated file. # logs never duplicate because this file is a rotated new file. @pe.update(inode, 0) else # this is MemoryPositionEntry or this is the first time fluentd started. # seek to the end of the any files. # logs may duplicate without this seek because it's not sure the file is # existent file or rotated new file. pos = @read_from_head ? 0 : fsize @pe.update(inode, pos) end @io_handler = IOHandler.new(self, &method(:wrap_receive_lines)) else @io_handler = NullIOHandler.new end else watcher_needs_update = false if stat inode = stat.ino if inode == @pe.read_inode # truncated @pe.update_pos(0) @io_handler.close elsif !@io_handler.opened? # There is no previous file. Reuse TailWatcher @pe.update(inode, 0) else # file is rotated and new file found watcher_needs_update = true end else # file is rotated and new file not found # Clear RotateHandler to avoid duplicated file watch in same path. @rotate_handler = nil watcher_needs_update = true end log_msg = "detected rotation of #{@path}" log_msg << "; waiting #{@rotate_wait} seconds" if watcher_needs_update # wait rotate_time if previous file exists @log.info log_msg if watcher_needs_update @update_watcher.call(@path, swap_state(@pe)) else @io_handler = IOHandler.new(self, &method(:wrap_receive_lines)) end end end |
#swap_state(pe) ⇒ Object
552 553 554 555 556 557 558 |
# File 'lib/fluent/plugin/in_tail.rb', line 552 def swap_state(pe) # Use MemoryPositionEntry for rotated file temporary mpe = MemoryPositionEntry.new mpe.update(pe.read_inode, pe.read_pos) @pe = mpe pe # This pe will be updated in on_rotate after TailWatcher is initialized end |
#tag ⇒ Object
451 452 453 |
# File 'lib/fluent/plugin/in_tail.rb', line 451 def tag @parsed_tag ||= @path.tr('/', '.').gsub(/\.+/, '.').gsub(/^\./, '') end |
#wrap_receive_lines(lines) ⇒ Object
455 456 457 |
# File 'lib/fluent/plugin/in_tail.rb', line 455 def wrap_receive_lines(lines) @receive_lines.call(lines, self) end |