Class: Fluent::Plugin::TailInput::TailWatcher
- Inherits:
-
Object
- Object
- Fluent::Plugin::TailInput::TailWatcher
- Defined in:
- lib/fluent/plugin/in_tail.rb
Defined Under Namespace
Classes: IOHandler, LineBufferTimerFlusher, NullIOHandler, RotateHandler, StatWatcher
Instance Attribute Summary collapse
-
#enable_watch_timer ⇒ Object
readonly
Returns the value of attribute enable_watch_timer.
-
#line_buffer ⇒ Object
Returns the value of attribute line_buffer.
-
#line_buffer_timer_flusher ⇒ Object
Returns the value of attribute line_buffer_timer_flusher.
-
#path ⇒ Object
readonly
Returns the value of attribute path.
-
#stat_trigger ⇒ Object
readonly
Returns the value of attribute stat_trigger.
-
#timer_trigger ⇒ Object
Returns the value of attribute timer_trigger.
-
#unwatched ⇒ Object
This is used for removing position entry from PositionFile.
Instance Method Summary collapse
- #attach {|_self| ... } ⇒ Object
- #close(close_io = true) ⇒ Object
- #detach ⇒ Object
-
#initialize(path, rotate_wait, pe, log, read_from_head, enable_watch_timer, read_lines_limit, update_watcher, line_buffer_timer_flusher, &receive_lines) ⇒ TailWatcher
constructor
A new instance of TailWatcher.
- #on_notify ⇒ Object
- #on_rotate(io) ⇒ Object
- #swap_state(pe) ⇒ Object
- #tag ⇒ Object
- #wrap_receive_lines(lines) ⇒ Object
Constructor Details
#initialize(path, rotate_wait, pe, log, read_from_head, enable_watch_timer, read_lines_limit, update_watcher, line_buffer_timer_flusher, &receive_lines) ⇒ TailWatcher
Returns a new instance of TailWatcher.
396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 |
# File 'lib/fluent/plugin/in_tail.rb', line 396 def initialize(path, rotate_wait, pe, log, read_from_head, enable_watch_timer, read_lines_limit, update_watcher, line_buffer_timer_flusher, &receive_lines) @path = path @rotate_wait = rotate_wait @pe = pe || MemoryPositionEntry.new @read_from_head = read_from_head @enable_watch_timer = enable_watch_timer @read_lines_limit = read_lines_limit @receive_lines = receive_lines @update_watcher = update_watcher @stat_trigger = StatWatcher.new(path, log, &method(:on_notify)) @timer_trigger = nil @rotate_handler = RotateHandler.new(path, log, &method(:on_rotate)) @io_handler = nil @log = log @line_buffer_timer_flusher = line_buffer_timer_flusher end |
Instance Attribute Details
#enable_watch_timer ⇒ Object (readonly)
Returns the value of attribute enable_watch_timer.
417 418 419 |
# File 'lib/fluent/plugin/in_tail.rb', line 417 def enable_watch_timer @enable_watch_timer end |
#line_buffer ⇒ Object
Returns the value of attribute line_buffer.
419 420 421 |
# File 'lib/fluent/plugin/in_tail.rb', line 419 def line_buffer @line_buffer end |
#line_buffer_timer_flusher ⇒ Object
Returns the value of attribute line_buffer_timer_flusher.
419 420 421 |
# File 'lib/fluent/plugin/in_tail.rb', line 419 def line_buffer_timer_flusher @line_buffer_timer_flusher end |
#path ⇒ Object (readonly)
Returns the value of attribute path.
416 417 418 |
# File 'lib/fluent/plugin/in_tail.rb', line 416 def path @path end |
#stat_trigger ⇒ Object (readonly)
Returns the value of attribute stat_trigger.
417 418 419 |
# File 'lib/fluent/plugin/in_tail.rb', line 417 def stat_trigger @stat_trigger end |
#timer_trigger ⇒ Object
Returns the value of attribute timer_trigger.
418 419 420 |
# File 'lib/fluent/plugin/in_tail.rb', line 418 def timer_trigger @timer_trigger end |
#unwatched ⇒ Object
This is used for removing position entry from PositionFile
420 421 422 |
# File 'lib/fluent/plugin/in_tail.rb', line 420 def unwatched @unwatched end |
Instance Method Details
#attach {|_self| ... } ⇒ Object
430 431 432 433 |
# File 'lib/fluent/plugin/in_tail.rb', line 430 def attach on_notify yield self end |
#close(close_io = true) ⇒ Object
440 441 442 443 444 445 446 |
# File 'lib/fluent/plugin/in_tail.rb', line 440 def close(close_io = true) if close_io && @io_handler @io_handler.on_notify @io_handler.close end detach end |
#detach ⇒ Object
435 436 437 438 |
# File 'lib/fluent/plugin/in_tail.rb', line 435 def detach @timer_trigger.detach if @enable_watch_timer && @timer_trigger.attached? @stat_trigger.detach if @stat_trigger.attached? end |
#on_notify ⇒ Object
448 449 450 451 452 453 |
# File 'lib/fluent/plugin/in_tail.rb', line 448 def on_notify @rotate_handler.on_notify if @rotate_handler @line_buffer_timer_flusher.on_notify(self) if @line_buffer_timer_flusher return unless @io_handler @io_handler.on_notify end |
#on_rotate(io) ⇒ Object
455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 |
# File 'lib/fluent/plugin/in_tail.rb', line 455 def on_rotate(io) if @io_handler == nil if io # first time stat = io.stat fsize = stat.size inode = stat.ino last_inode = @pe.read_inode if inode == last_inode # rotated file has the same inode number with the last file. # assuming following situation: # a) file was once renamed and backed, or # b) symlink or hardlink to the same file is recreated # in either case, seek to the saved position pos = @pe.read_pos elsif last_inode != 0 # this is FilePositionEntry and fluentd once started. # read data from the head of the rotated file. # logs never duplicate because this file is a rotated new file. pos = 0 @pe.update(inode, pos) else # this is MemoryPositionEntry or this is the first time fluentd started. # seek to the end of the any files. # logs may duplicate without this seek because it's not sure the file is # existent file or rotated new file. pos = @read_from_head ? 0 : fsize @pe.update(inode, pos) end io.seek(pos) @io_handler = IOHandler.new(io, @pe, @log, @read_lines_limit, &method(:wrap_receive_lines)) else @io_handler = NullIOHandler.new end else log_msg = "detected rotation of #{@path}" log_msg << "; waiting #{@rotate_wait} seconds" if @io_handler.io # wait rotate_time if previous file is exist @log.info log_msg if io stat = io.stat inode = stat.ino if inode == @pe.read_inode # truncated @pe.update_pos(stat.size) io_handler = IOHandler.new(io, @pe, @log, @read_lines_limit, &method(:wrap_receive_lines)) @io_handler.close @io_handler = io_handler elsif @io_handler.io.nil? # There is no previous file. Reuse TailWatcher @pe.update(inode, io.pos) io_handler = IOHandler.new(io, @pe, @log, @read_lines_limit, &method(:wrap_receive_lines)) @io_handler = io_handler else # file is rotated and new file found @update_watcher.call(@path, swap_state(@pe)) end else # file is rotated and new file not found # Clear RotateHandler to avoid duplicated file watch in same path. @rotate_handler = nil @update_watcher.call(@path, swap_state(@pe)) end end end |
#swap_state(pe) ⇒ Object
519 520 521 522 523 524 525 526 527 |
# File 'lib/fluent/plugin/in_tail.rb', line 519 def swap_state(pe) # Use MemoryPositionEntry for rotated file temporary mpe = MemoryPositionEntry.new mpe.update(pe.read_inode, pe.read_pos) @pe = mpe @io_handler.pe = mpe # Don't re-create IOHandler because IOHandler has an internal buffer. pe # This pe will be updated in on_rotate after TailWatcher is initialized end |
#tag ⇒ Object
422 423 424 |
# File 'lib/fluent/plugin/in_tail.rb', line 422 def tag @parsed_tag ||= @path.tr('/', '.').gsub(/\.+/, '.').gsub(/^\./, '') end |
#wrap_receive_lines(lines) ⇒ Object
426 427 428 |
# File 'lib/fluent/plugin/in_tail.rb', line 426 def wrap_receive_lines(lines) @receive_lines.call(lines, self) end |