Class: Fluent::TailInput::TailWatcher
- Inherits:
-
Object
- Object
- Fluent::TailInput::TailWatcher
- Defined in:
- lib/fluent/plugin/in_tail.rb
Defined Under Namespace
Classes: Closer, IOHandler, LineBufferTimerFlusher, NullIOHandler, RotateHandler, StatWatcher, TimerWatcher
Instance Attribute Summary collapse
-
#line_buffer ⇒ Object
Returns the value of attribute line_buffer.
-
#line_buffer_timer_flusher ⇒ Object
Returns the value of attribute line_buffer_timer_flusher.
-
#path ⇒ Object
readonly
Returns the value of attribute path.
-
#unwatched ⇒ Object
This is used for removing position entry from PositionFile.
Instance Method Summary collapse
- #attach(loop) ⇒ Object
- #close(close_io = true) ⇒ Object
- #detach ⇒ Object
-
#initialize(path, rotate_wait, pe, log, read_from_head, enable_watch_timer, read_lines_limit, update_watcher, line_buffer_timer_flusher, &receive_lines) ⇒ TailWatcher
constructor
A new instance of TailWatcher.
- #on_notify ⇒ Object
- #on_rotate(io) ⇒ Object
- #swap_state(pe) ⇒ Object
- #tag ⇒ Object
- #wrap_receive_lines(lines) ⇒ Object
Constructor Details
#initialize(path, rotate_wait, pe, log, read_from_head, enable_watch_timer, read_lines_limit, update_watcher, line_buffer_timer_flusher, &receive_lines) ⇒ TailWatcher
Returns a new instance of TailWatcher.
393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 |
# File 'lib/fluent/plugin/in_tail.rb', line 393 def initialize(path, rotate_wait, pe, log, read_from_head, enable_watch_timer, read_lines_limit, update_watcher, line_buffer_timer_flusher, &receive_lines) @path = path @rotate_wait = rotate_wait @pe = pe || MemoryPositionEntry.new @read_from_head = read_from_head @enable_watch_timer = enable_watch_timer @read_lines_limit = read_lines_limit @receive_lines = receive_lines @update_watcher = update_watcher @timer_trigger = TimerWatcher.new(1, true, log, &method(:on_notify)) if @enable_watch_timer @stat_trigger = StatWatcher.new(path, log, &method(:on_notify)) @rotate_handler = RotateHandler.new(path, log, &method(:on_rotate)) @io_handler = nil @log = log @line_buffer_timer_flusher = line_buffer_timer_flusher end |
Instance Attribute Details
#line_buffer ⇒ Object
Returns the value of attribute line_buffer.
415 416 417 |
# File 'lib/fluent/plugin/in_tail.rb', line 415 def line_buffer @line_buffer end |
#line_buffer_timer_flusher ⇒ Object
Returns the value of attribute line_buffer_timer_flusher.
415 416 417 |
# File 'lib/fluent/plugin/in_tail.rb', line 415 def line_buffer_timer_flusher @line_buffer_timer_flusher end |
#path ⇒ Object (readonly)
Returns the value of attribute path.
414 415 416 |
# File 'lib/fluent/plugin/in_tail.rb', line 414 def path @path end |
#unwatched ⇒ Object
This is used for removing position entry from PositionFile
416 417 418 |
# File 'lib/fluent/plugin/in_tail.rb', line 416 def unwatched @unwatched end |
Instance Method Details
#attach(loop) ⇒ Object
426 427 428 429 430 |
# File 'lib/fluent/plugin/in_tail.rb', line 426 def attach(loop) @timer_trigger.attach(loop) if @enable_watch_timer @stat_trigger.attach(loop) on_notify end |
#close(close_io = true) ⇒ Object
437 438 439 440 441 442 443 |
# File 'lib/fluent/plugin/in_tail.rb', line 437 def close(close_io = true) if close_io && @io_handler @io_handler.on_notify @io_handler.close end detach end |
#detach ⇒ Object
432 433 434 435 |
# File 'lib/fluent/plugin/in_tail.rb', line 432 def detach @timer_trigger.detach if @enable_watch_timer && @timer_trigger.attached? @stat_trigger.detach if @stat_trigger.attached? end |
#on_notify ⇒ Object
445 446 447 448 449 450 |
# File 'lib/fluent/plugin/in_tail.rb', line 445 def on_notify @rotate_handler.on_notify if @rotate_handler @line_buffer_timer_flusher.on_notify(self) if @line_buffer_timer_flusher return unless @io_handler @io_handler.on_notify end |
#on_rotate(io) ⇒ Object
452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 |
# File 'lib/fluent/plugin/in_tail.rb', line 452 def on_rotate(io) if @io_handler == nil if io # first time stat = io.stat fsize = stat.size inode = stat.ino last_inode = @pe.read_inode if inode == last_inode # rotated file has the same inode number with the last file. # assuming following situation: # a) file was once renamed and backed, or # b) symlink or hardlink to the same file is recreated # in either case, seek to the saved position pos = @pe.read_pos elsif last_inode != 0 # this is FilePositionEntry and fluentd once started. # read data from the head of the rotated file. # logs never duplicate because this file is a rotated new file. pos = 0 @pe.update(inode, pos) else # this is MemoryPositionEntry or this is the first time fluentd started. # seek to the end of the any files. # logs may duplicate without this seek because it's not sure the file is # existent file or rotated new file. pos = @read_from_head ? 0 : fsize @pe.update(inode, pos) end io.seek(pos) @io_handler = IOHandler.new(io, @pe, @log, @read_lines_limit, &method(:wrap_receive_lines)) else @io_handler = NullIOHandler.new end else log_msg = "detected rotation of #{@path}" log_msg << "; waiting #{@rotate_wait} seconds" if @io_handler.io # wait rotate_time if previous file is exist @log.info log_msg if io stat = io.stat inode = stat.ino if inode == @pe.read_inode # truncated @pe.update_pos(stat.size) io_handler = IOHandler.new(io, @pe, @log, @read_lines_limit, &method(:wrap_receive_lines)) @io_handler.close @io_handler = io_handler elsif @io_handler.io.nil? # There is no previous file. Reuse TailWatcher @pe.update(inode, io.pos) io_handler = IOHandler.new(io, @pe, @log, @read_lines_limit, &method(:wrap_receive_lines)) @io_handler = io_handler else # file is rotated and new file found @update_watcher.call(@path, swap_state(@pe)) end else # file is rotated and new file not found # Clear RotateHandler to avoid duplicated file watch in same path. @rotate_handler = nil @update_watcher.call(@path, swap_state(@pe)) end end end |
#swap_state(pe) ⇒ Object
516 517 518 519 520 521 522 523 524 |
# File 'lib/fluent/plugin/in_tail.rb', line 516 def swap_state(pe) # Use MemoryPositionEntry for rotated file temporary mpe = MemoryPositionEntry.new mpe.update(pe.read_inode, pe.read_pos) @pe = mpe @io_handler.pe = mpe # Don't re-create IOHandler because IOHandler has an internal buffer. pe # This pe will be updated in on_rotate after TailWatcher is initialized end |
#tag ⇒ Object
418 419 420 |
# File 'lib/fluent/plugin/in_tail.rb', line 418 def tag @parsed_tag ||= @path.tr('/', '.').gsub(/\.+/, '.').gsub(/^\./, '') end |
#wrap_receive_lines(lines) ⇒ Object
422 423 424 |
# File 'lib/fluent/plugin/in_tail.rb', line 422 def wrap_receive_lines(lines) @receive_lines.call(lines, self) end |