Class: Fluent::NewTailInput::TailWatcher::IOHandler

Inherits:
Object
  • Object
show all
Defined in:
lib/fluent/plugin/in_tail.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(io, pe, log, read_lines_limit, first = true, &receive_lines) ⇒ IOHandler

Returns a new instance of IOHandler.



555
556
557
558
559
560
561
562
563
564
565
# File 'lib/fluent/plugin/in_tail.rb', line 555

def initialize(io, pe, log, read_lines_limit, first = true, &receive_lines)
  @log = log
  @log.info "following tail of #{io.path}" if first
  @io = io
  @pe = pe
  @read_lines_limit = read_lines_limit
  @receive_lines = receive_lines
  @buffer = ''.force_encoding('ASCII-8BIT')
  @iobuf = ''.force_encoding('ASCII-8BIT')
  @lines = []
end

Instance Attribute Details

#ioObject (readonly)

Returns the value of attribute io.



567
568
569
# File 'lib/fluent/plugin/in_tail.rb', line 567

def io
  @io
end

#peObject

Returns the value of attribute pe.



568
569
570
# File 'lib/fluent/plugin/in_tail.rb', line 568

def pe
  @pe
end

Instance Method Details

#closeObject



611
612
613
# File 'lib/fluent/plugin/in_tail.rb', line 611

def close
  @io.close unless @io.closed?
end

#on_notifyObject



570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
# File 'lib/fluent/plugin/in_tail.rb', line 570

def on_notify
  begin
    read_more = false

    if @lines.empty?
      begin
        while true
          if @buffer.empty?
            @io.readpartial(2048, @buffer)
          else
            @buffer << @io.readpartial(2048, @iobuf)
          end
          while line = @buffer.slice!(/.*?\n/m)
            @lines << line
          end
          if @lines.size >= @read_lines_limit
            # not to use too much memory in case the file is very large
            read_more = true
            break
          end
        end
      rescue EOFError
      end
    end

    unless @lines.empty?
      if @receive_lines.call(@lines)
        @pe.update_pos(@io.pos - @buffer.bytesize)
        @lines.clear
      else
        read_more = false
      end
    end
  end while read_more

rescue
  @log.error $!.to_s
  @log.error_backtrace
  close
end