Class: Fluent::Plugin::TailInput::TailWatcher
- Inherits:
-
Object
- Object
- Fluent::Plugin::TailInput::TailWatcher
- Defined in:
- lib/fluent/plugin/in_tail.rb
Defined Under Namespace
Classes: FIFO, IOHandler, LineBufferTimerFlusher, NullIOHandler, RotateHandler, StatWatcher
Instance Attribute Summary collapse
-
#enable_watch_timer ⇒ Object
readonly
Returns the value of attribute enable_watch_timer.
-
#encoding ⇒ Object
readonly
Returns the value of attribute encoding.
-
#from_encoding ⇒ Object
readonly
Returns the value of attribute from_encoding.
-
#line_buffer ⇒ Object
Returns the value of attribute line_buffer.
-
#line_buffer_timer_flusher ⇒ Object
Returns the value of attribute line_buffer_timer_flusher.
-
#log ⇒ Object
readonly
Returns the value of attribute log.
-
#open_on_every_update ⇒ Object
readonly
Returns the value of attribute open_on_every_update.
-
#path ⇒ Object
readonly
Returns the value of attribute path.
-
#pe ⇒ Object
readonly
Returns the value of attribute pe.
-
#read_lines_limit ⇒ Object
readonly
Returns the value of attribute read_lines_limit.
-
#stat_trigger ⇒ Object
readonly
Returns the value of attribute stat_trigger.
-
#timer_trigger ⇒ Object
Returns the value of attribute timer_trigger.
-
#unwatched ⇒ Object
This is used for removing position entry from PositionFile.
Instance Method Summary collapse
- #attach {|_self| ... } ⇒ Object
- #close ⇒ Object
- #detach ⇒ Object
-
#initialize(path, rotate_wait, pe, log, read_from_head, enable_watch_timer, read_lines_limit, update_watcher, line_buffer_timer_flusher, from_encoding, encoding, open_on_every_update, &receive_lines) ⇒ TailWatcher
constructor
A new instance of TailWatcher.
- #on_notify ⇒ Object
- #on_rotate(stat) ⇒ Object
- #swap_state(pe) ⇒ Object
- #tag ⇒ Object
- #wrap_receive_lines(lines) ⇒ Object
Constructor Details
#initialize(path, rotate_wait, pe, log, read_from_head, enable_watch_timer, read_lines_limit, update_watcher, line_buffer_timer_flusher, from_encoding, encoding, open_on_every_update, &receive_lines) ⇒ TailWatcher
Returns a new instance of TailWatcher.
429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 |
# File 'lib/fluent/plugin/in_tail.rb', line 429 def initialize(path, rotate_wait, pe, log, read_from_head, enable_watch_timer, read_lines_limit, update_watcher, line_buffer_timer_flusher, from_encoding, encoding, open_on_every_update, &receive_lines) @path = path @rotate_wait = rotate_wait @pe = pe || MemoryPositionEntry.new @read_from_head = read_from_head @enable_watch_timer = enable_watch_timer @read_lines_limit = read_lines_limit @receive_lines = receive_lines @update_watcher = update_watcher @stat_trigger = StatWatcher.new(self, &method(:on_notify)) @timer_trigger = nil @rotate_handler = RotateHandler.new(self, &method(:on_rotate)) @io_handler = nil @log = log @line_buffer_timer_flusher = line_buffer_timer_flusher @from_encoding = from_encoding @encoding = encoding @open_on_every_update = open_on_every_update end |
Instance Attribute Details
#enable_watch_timer ⇒ Object (readonly)
Returns the value of attribute enable_watch_timer.
455 456 457 |
# File 'lib/fluent/plugin/in_tail.rb', line 455 def enable_watch_timer @enable_watch_timer end |
#encoding ⇒ Object (readonly)
Returns the value of attribute encoding.
454 455 456 |
# File 'lib/fluent/plugin/in_tail.rb', line 454 def encoding @encoding end |
#from_encoding ⇒ Object (readonly)
Returns the value of attribute from_encoding.
454 455 456 |
# File 'lib/fluent/plugin/in_tail.rb', line 454 def from_encoding @from_encoding end |
#line_buffer ⇒ Object
Returns the value of attribute line_buffer.
457 458 459 |
# File 'lib/fluent/plugin/in_tail.rb', line 457 def line_buffer @line_buffer end |
#line_buffer_timer_flusher ⇒ Object
Returns the value of attribute line_buffer_timer_flusher.
457 458 459 |
# File 'lib/fluent/plugin/in_tail.rb', line 457 def line_buffer_timer_flusher @line_buffer_timer_flusher end |
#log ⇒ Object (readonly)
Returns the value of attribute log.
453 454 455 |
# File 'lib/fluent/plugin/in_tail.rb', line 453 def log @log end |
#open_on_every_update ⇒ Object (readonly)
Returns the value of attribute open_on_every_update.
453 454 455 |
# File 'lib/fluent/plugin/in_tail.rb', line 453 def open_on_every_update @open_on_every_update end |
#path ⇒ Object (readonly)
Returns the value of attribute path.
452 453 454 |
# File 'lib/fluent/plugin/in_tail.rb', line 452 def path @path end |
#pe ⇒ Object (readonly)
Returns the value of attribute pe.
453 454 455 |
# File 'lib/fluent/plugin/in_tail.rb', line 453 def pe @pe end |
#read_lines_limit ⇒ Object (readonly)
Returns the value of attribute read_lines_limit.
453 454 455 |
# File 'lib/fluent/plugin/in_tail.rb', line 453 def read_lines_limit @read_lines_limit end |
#stat_trigger ⇒ Object (readonly)
Returns the value of attribute stat_trigger.
455 456 457 |
# File 'lib/fluent/plugin/in_tail.rb', line 455 def stat_trigger @stat_trigger end |
#timer_trigger ⇒ Object
Returns the value of attribute timer_trigger.
456 457 458 |
# File 'lib/fluent/plugin/in_tail.rb', line 456 def timer_trigger @timer_trigger end |
#unwatched ⇒ Object
This is used for removing position entry from PositionFile
458 459 460 |
# File 'lib/fluent/plugin/in_tail.rb', line 458 def unwatched @unwatched end |
Instance Method Details
#attach {|_self| ... } ⇒ Object
468 469 470 471 |
# File 'lib/fluent/plugin/in_tail.rb', line 468 def attach on_notify yield self end |
#close ⇒ Object
479 480 481 482 483 484 |
# File 'lib/fluent/plugin/in_tail.rb', line 479 def close if @io_handler @io_handler.close @io_handler = nil end end |
#detach ⇒ Object
473 474 475 476 477 |
# File 'lib/fluent/plugin/in_tail.rb', line 473 def detach @timer_trigger.detach if @enable_watch_timer && @timer_trigger.attached? @stat_trigger.detach if @stat_trigger.attached? @io_handler.on_notify if @io_handler end |
#on_notify ⇒ Object
486 487 488 489 490 491 492 493 494 495 496 497 |
# File 'lib/fluent/plugin/in_tail.rb', line 486 def on_notify begin stat = Fluent::FileWrapper.stat(@path) rescue Errno::ENOENT # moved or deleted stat = nil end @rotate_handler.on_notify(stat) if @rotate_handler @line_buffer_timer_flusher.on_notify(self) if @line_buffer_timer_flusher @io_handler.on_notify if @io_handler end |
#on_rotate(stat) ⇒ Object
499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 |
# File 'lib/fluent/plugin/in_tail.rb', line 499 def on_rotate(stat) if @io_handler.nil? if stat # first time fsize = stat.size inode = stat.ino last_inode = @pe.read_inode if inode == last_inode # rotated file has the same inode number with the last file. # assuming following situation: # a) file was once renamed and backed, or # b) symlink or hardlink to the same file is recreated # in either case, seek to the saved position elsif last_inode != 0 # this is FilePositionEntry and fluentd once started. # read data from the head of the rotated file. # logs never duplicate because this file is a rotated new file. @pe.update(inode, 0) else # this is MemoryPositionEntry or this is the first time fluentd started. # seek to the end of the any files. # logs may duplicate without this seek because it's not sure the file is # existent file or rotated new file. pos = @read_from_head ? 0 : fsize @pe.update(inode, pos) end @io_handler = IOHandler.new(self, &method(:wrap_receive_lines)) else @io_handler = NullIOHandler.new end else watcher_needs_update = false if stat inode = stat.ino if inode == @pe.read_inode # truncated @pe.update_pos(0) @io_handler.close elsif !@io_handler.opened? # There is no previous file. Reuse TailWatcher @pe.update(inode, 0) else # file is rotated and new file found watcher_needs_update = true end else # file is rotated and new file not found # Clear RotateHandler to avoid duplicated file watch in same path. @rotate_handler = nil watcher_needs_update = true end log_msg = "detected rotation of #{@path}" log_msg << "; waiting #{@rotate_wait} seconds" if watcher_needs_update # wait rotate_time if previous file exists @log.info log_msg if watcher_needs_update @update_watcher.call(@path, swap_state(@pe)) else @io_handler = IOHandler.new(self, &method(:wrap_receive_lines)) end end end |
#swap_state(pe) ⇒ Object
561 562 563 564 565 566 567 |
# File 'lib/fluent/plugin/in_tail.rb', line 561 def swap_state(pe) # Use MemoryPositionEntry for rotated file temporary mpe = MemoryPositionEntry.new mpe.update(pe.read_inode, pe.read_pos) @pe = mpe pe # This pe will be updated in on_rotate after TailWatcher is initialized end |
#tag ⇒ Object
460 461 462 |
# File 'lib/fluent/plugin/in_tail.rb', line 460 def tag @parsed_tag ||= @path.tr('/', '.').gsub(/\.+/, '.').gsub(/^\./, '') end |
#wrap_receive_lines(lines) ⇒ Object
464 465 466 |
# File 'lib/fluent/plugin/in_tail.rb', line 464 def wrap_receive_lines(lines) @receive_lines.call(lines, self) end |