Class: Fluent::TailPathInput

Inherits:
Input
  • Object
show all
Defined in:
lib/fluent/plugin/in_tailpath.rb

Defined Under Namespace

Classes: FilePositionEntry, MemoryPositionEntry, PositionFile, TailWatcher

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeTailPathInput

Returns a new instance of TailPathInput.



22
23
24
25
# File 'lib/fluent/plugin/in_tailpath.rb', line 22

def initialize
  super
  @paths = []
end

Instance Attribute Details

#pathsObject (readonly)

Returns the value of attribute paths.



32
33
34
# File 'lib/fluent/plugin/in_tailpath.rb', line 32

def paths
  @paths
end

Instance Method Details

#configure(conf) ⇒ Object



34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/fluent/plugin/in_tailpath.rb', line 34

def configure(conf)
  super

  @paths = @path.split(',').map {|path| path.strip }
  if @paths.empty?
    raise ConfigError, "tailpath: 'path' parameter is required on tail input"
  end

  unless @pos_file
    $log.warn "'pos_file PATH' parameter is not set to a 'tail' source."
    $log.warn "this parameter is highly recommended to save the position to resume tailing."
  end

  configure_parser(conf)
end

#configure_parser(conf) ⇒ Object



50
51
52
53
# File 'lib/fluent/plugin/in_tailpath.rb', line 50

def configure_parser(conf)
  @parser = TextParser.new
  @parser.configure(conf)
end

#parse_line(line) ⇒ Object



115
116
117
# File 'lib/fluent/plugin/in_tailpath.rb', line 115

def parse_line(line)
  return @parser.parse(line)
end

#receive_lines(path, lines) ⇒ Object



89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/fluent/plugin/in_tailpath.rb', line 89

def receive_lines(path, lines)
  es = MultiEventStream.new
  lines.each {|line|
    begin
      line.chomp!  # remove \n
      time, record = parse_line(line)
      if time && record
        record['log_path'] = path
        record['raw_message'] = line
        es.add(time, record)
      end
    rescue
      $log.warn line.dump, :error=>$!.to_s
      $log.debug_backtrace
    end
  }

  unless es.empty?
    begin
      Engine.emit_stream(@tag, es)
    rescue
      # ignore errors. Engine shows logs and backtraces.
    end
  end
end

#runObject



82
83
84
85
86
87
# File 'lib/fluent/plugin/in_tailpath.rb', line 82

def run
  @loop.run
rescue
  $log.error "unexpected error", :error=>$!.to_s
  $log.error_backtrace
end

#shutdownObject



73
74
75
76
77
78
79
80
# File 'lib/fluent/plugin/in_tailpath.rb', line 73

def shutdown
  @tails.each {|tail|
    tail.close
  }
  @loop.stop
  @thread.join
  @pf_file.close if @pf_file
end

#startObject



55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/fluent/plugin/in_tailpath.rb', line 55

def start
  if @pos_file
    @pf_file = File.open(@pos_file, File::RDWR|File::CREAT, DEFAULT_FILE_PERMISSION)
    @pf_file.sync = true
    @pf = PositionFile.parse(@pf_file)
  end

  @loop = Coolio::Loop.new
  @tails = @paths.map {|path|
    pe = @pf ? @pf[path] : MemoryPositionEntry.new
    TailWatcher.new(path, @rotate_wait, pe, &method(:receive_lines))
  }
  @tails.each {|tail|
    tail.attach(@loop)
  }
  @thread = Thread.new(&method(:run))
end