Class: Fluent::Plugin::TailInput::TailWatcher::IOHandler

Inherits:
Object
  • Object
show all
Defined in:
lib/fluent/plugin/in_tail.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(io, pe, log, read_lines_limit, first = true, &receive_lines) ⇒ IOHandler

Returns a new instance of IOHandler.



546
547
548
549
550
551
552
553
554
555
556
# File 'lib/fluent/plugin/in_tail.rb', line 546

def initialize(io, pe, log, read_lines_limit, first = true, &receive_lines)
  @log = log
  @log.info "following tail of #{io.path}" if first
  @io = io
  @pe = pe
  @read_lines_limit = read_lines_limit
  @receive_lines = receive_lines
  @buffer = ''.force_encoding('ASCII-8BIT')
  @iobuf = ''.force_encoding('ASCII-8BIT')
  @lines = []
end

Instance Attribute Details

#ioObject (readonly)

Returns the value of attribute io.



558
559
560
# File 'lib/fluent/plugin/in_tail.rb', line 558

def io
  @io
end

#peObject

Returns the value of attribute pe.



559
560
561
# File 'lib/fluent/plugin/in_tail.rb', line 559

def pe
  @pe
end

Instance Method Details

#closeObject



602
603
604
# File 'lib/fluent/plugin/in_tail.rb', line 602

def close
  @io.close unless @io.closed?
end

#on_notifyObject



561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
# File 'lib/fluent/plugin/in_tail.rb', line 561

def on_notify
  begin
    read_more = false

    if @lines.empty?
      begin
        while true
          if @buffer.empty?
            @io.readpartial(2048, @buffer)
          else
            @buffer << @io.readpartial(2048, @iobuf)
          end
          while line = @buffer.slice!(/.*?\n/m)
            @lines << line
          end
          if @lines.size >= @read_lines_limit
            # not to use too much memory in case the file is very large
            read_more = true
            break
          end
        end
      rescue EOFError
      end
    end

    unless @lines.empty?
      if @receive_lines.call(@lines)
        @pe.update_pos(@io.pos - @buffer.bytesize)
        @lines.clear
      else
        read_more = false
      end
    end
  end while read_more

rescue
  @log.error $!.to_s
  @log.error_backtrace
  close
end