Class: Fluent::TailInput::TailWatcher::IOHandler

Inherits:
Object
  • Object
show all
Defined in:
lib/fluent/plugin/in_tail.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(io, pe, log, read_lines_limit, first = true, &receive_lines) ⇒ IOHandler

Returns a new instance of IOHandler.



577
578
579
580
581
582
583
584
585
586
587
# File 'lib/fluent/plugin/in_tail.rb', line 577

def initialize(io, pe, log, read_lines_limit, first = true, &receive_lines)
  @log = log
  @log.info "following tail of #{io.path}" if first
  @io = io
  @pe = pe
  @read_lines_limit = read_lines_limit
  @receive_lines = receive_lines
  @buffer = ''.force_encoding('ASCII-8BIT')
  @iobuf = ''.force_encoding('ASCII-8BIT')
  @lines = []
end

Instance Attribute Details

#ioObject (readonly)

Returns the value of attribute io.



589
590
591
# File 'lib/fluent/plugin/in_tail.rb', line 589

def io
  @io
end

#peObject

Returns the value of attribute pe.



590
591
592
# File 'lib/fluent/plugin/in_tail.rb', line 590

def pe
  @pe
end

Instance Method Details

#closeObject



633
634
635
# File 'lib/fluent/plugin/in_tail.rb', line 633

def close
  @io.close unless @io.closed?
end

#on_notifyObject



592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
# File 'lib/fluent/plugin/in_tail.rb', line 592

def on_notify
  begin
    read_more = false

    if @lines.empty?
      begin
        while true
          if @buffer.empty?
            @io.readpartial(2048, @buffer)
          else
            @buffer << @io.readpartial(2048, @iobuf)
          end
          while line = @buffer.slice!(/.*?\n/m)
            @lines << line
          end
          if @lines.size >= @read_lines_limit
            # not to use too much memory in case the file is very large
            read_more = true
            break
          end
        end
      rescue EOFError
      end
    end

    unless @lines.empty?
      if @receive_lines.call(@lines)
        @pe.update_pos(@io.pos - @buffer.bytesize)
        @lines.clear
      else
        read_more = false
      end
    end
  end while read_more

rescue
  @log.error $!.to_s
  @log.error_backtrace
  close
end