Class: Fluent::TailPathInput
- Inherits:
-
Input
- Object
- Input
- Fluent::TailPathInput
show all
- Defined in:
- lib/fluent/plugin/in_tailpath.rb
Defined Under Namespace
Classes: FilePositionEntry, MemoryPositionEntry, PositionFile, TailWatcher
Instance Attribute Summary collapse
Instance Method Summary
collapse
Constructor Details
Returns a new instance of TailPathInput.
22
23
24
25
|
# File 'lib/fluent/plugin/in_tailpath.rb', line 22
def initialize
super
@paths = []
end
|
Instance Attribute Details
#paths ⇒ Object
Returns the value of attribute paths.
32
33
34
|
# File 'lib/fluent/plugin/in_tailpath.rb', line 32
def paths
@paths
end
|
Instance Method Details
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
|
# File 'lib/fluent/plugin/in_tailpath.rb', line 34
def configure(conf)
super
@paths = @path.split(',').map {|path| path.strip }
if @paths.empty?
raise ConfigError, "tailpath: 'path' parameter is required on tail input"
end
unless @pos_file
$log.warn "'pos_file PATH' parameter is not set to a 'tail' source."
$log.warn "this parameter is highly recommended to save the position to resume tailing."
end
configure_parser(conf)
end
|
50
51
52
53
|
# File 'lib/fluent/plugin/in_tailpath.rb', line 50
def configure_parser(conf)
@parser = TextParser.new
@parser.configure(conf)
end
|
#parse_line(line) ⇒ Object
115
116
117
|
# File 'lib/fluent/plugin/in_tailpath.rb', line 115
def parse_line(line)
return @parser.parse(line)
end
|
#receive_lines(path, lines) ⇒ Object
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
|
# File 'lib/fluent/plugin/in_tailpath.rb', line 89
def receive_lines(path, lines)
es = MultiEventStream.new
lines.each {|line|
begin
line.chomp! time, record = parse_line(line)
if time && record
record['log_path'] = path
record['raw_message'] = line
es.add(time, record)
end
rescue
$log.warn line.dump, :error=>$!.to_s
$log.debug_backtrace
end
}
unless es.empty?
begin
Engine.emit_stream(@tag, es)
rescue
end
end
end
|
#run ⇒ Object
82
83
84
85
86
87
|
# File 'lib/fluent/plugin/in_tailpath.rb', line 82
def run
@loop.run
rescue
$log.error "unexpected error", :error=>$!.to_s
$log.error_backtrace
end
|
#shutdown ⇒ Object
73
74
75
76
77
78
79
80
|
# File 'lib/fluent/plugin/in_tailpath.rb', line 73
def shutdown
@tails.each {|tail|
tail.close
}
@loop.stop
@thread.join
@pf_file.close if @pf_file
end
|
#start ⇒ Object
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
|
# File 'lib/fluent/plugin/in_tailpath.rb', line 55
def start
if @pos_file
@pf_file = File.open(@pos_file, File::RDWR|File::CREAT, DEFAULT_FILE_PERMISSION)
@pf_file.sync = true
@pf = PositionFile.parse(@pf_file)
end
@loop = Coolio::Loop.new
@tails = @paths.map {|path|
pe = @pf ? @pf[path] : MemoryPositionEntry.new
TailWatcher.new(path, @rotate_wait, pe, &method(:receive_lines))
}
@tails.each {|tail|
tail.attach(@loop)
}
@thread = Thread.new(&method(:run))
end
|