Class: Fluent::EngineClass
Constant Summary
collapse
- MAINLOOP_SLEEP_INTERVAL =
0.3
- MATCH_CACHE_SIZE =
1024
- LOG_EMIT_INTERVAL =
0.1
Instance Attribute Summary collapse
Instance Method Summary
collapse
#msgpack_factory, #msgpack_packer, #msgpack_unpacker
Constructor Details
Returns a new instance of EngineClass.
34
35
36
37
38
39
40
41
42
43
44
45
46
47
|
# File 'lib/fluent/engine.rb', line 34
def initialize
@root_agent = nil
@event_router = nil
@default_loop = nil
@engine_stopped = false
@log_emit_thread = nil
@log_event_loop_stop = false
@log_event_queue = []
@suppress_config_dump = false
@system_config = SystemConfig.new
end
|
Instance Attribute Details
#matches ⇒ Object
Returns the value of attribute matches.
55
56
57
|
# File 'lib/fluent/engine.rb', line 55
def matches
@matches
end
|
#root_agent ⇒ Object
Returns the value of attribute root_agent.
54
55
56
|
# File 'lib/fluent/engine.rb', line 54
def root_agent
@root_agent
end
|
#sources ⇒ Object
Returns the value of attribute sources.
55
56
57
|
# File 'lib/fluent/engine.rb', line 55
def sources
@sources
end
|
#system_config ⇒ Object
Returns the value of attribute system_config.
56
57
58
|
# File 'lib/fluent/engine.rb', line 56
def system_config
@system_config
end
|
Instance Method Details
#add_plugin_dir(dir) ⇒ Object
127
128
129
|
# File 'lib/fluent/engine.rb', line 127
def add_plugin_dir(dir)
Plugin.add_plugin_dir(dir)
end
|
113
114
115
116
117
118
119
120
121
122
123
124
125
|
# File 'lib/fluent/engine.rb', line 113
def configure(conf)
Gem::Specification.find_all.select{|x| x.name =~ /^fluent(d|-(plugin|mixin)-.*)$/}.each do |spec|
$log.info "gem '#{spec.name}' version '#{spec.version}'"
end
@root_agent.configure(conf)
@event_router = @root_agent.event_router
unless @suppress_config_dump
$log.info "using configuration file: #{conf.to_s.rstrip}"
end
end
|
#emit(tag, time, record) ⇒ Object
131
132
133
|
# File 'lib/fluent/engine.rb', line 131
def emit(tag, time, record)
raise "BUG: use router.emit instead of Engine.emit"
end
|
#emit_array(tag, array) ⇒ Object
135
136
137
|
# File 'lib/fluent/engine.rb', line 135
def emit_array(tag, array)
raise "BUG: use router.emit_array instead of Engine.emit_array"
end
|
#emit_stream(tag, es) ⇒ Object
139
140
141
|
# File 'lib/fluent/engine.rb', line 139
def emit_stream(tag, es)
raise "BUG: use router.emit_stream instead of Engine.emit_stream"
end
|
#flush! ⇒ Object
143
144
145
|
# File 'lib/fluent/engine.rb', line 143
def flush!
@root_agent.flush!
end
|
#init(system_config) ⇒ Object
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
|
# File 'lib/fluent/engine.rb', line 58
def init(system_config)
@system_config = system_config
BasicSocket.do_not_reverse_lookup = true
suppress_interval(system_config.emit_error_log_interval) unless system_config.emit_error_log_interval.nil?
@suppress_config_dump = system_config.suppress_config_dump unless system_config.suppress_config_dump.nil?
@without_source = system_config.without_source unless system_config.without_source.nil?
@root_agent = RootAgent.new(log: log, system_config: @system_config)
MessagePackFactory.init
self
end
|
#log ⇒ Object
74
75
76
|
# File 'lib/fluent/engine.rb', line 74
def log
$log
end
|
#log_event_loop ⇒ Object
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
|
# File 'lib/fluent/engine.rb', line 152
def log_event_loop
$log.disable_events(Thread.current)
while sleep(LOG_EMIT_INTERVAL)
break if @log_event_loop_stop
next if @log_event_queue.empty?
events = @log_event_queue.slice!(0..-1)
next if events.empty?
events.each {|tag,time,record|
begin
@event_router.emit(tag, time, record)
rescue => e
$log.error "failed to emit fluentd's log event", tag: tag, event: record, error: e
end
}
end
end
|
#now ⇒ Object
147
148
149
150
|
# File 'lib/fluent/engine.rb', line 147
def now
Fluent::EventTime.now
end
|
#parse_config(io, fname, basepath = Dir.pwd, v1_config = false) ⇒ Object
83
84
85
86
87
88
89
90
|
# File 'lib/fluent/engine.rb', line 83
def parse_config(io, fname, basepath = Dir.pwd, v1_config = false)
if fname =~ /\.rb$/
require 'fluent/config/dsl'
Config::DSL::Parser.parse(io, File.join(basepath, fname))
else
Config.parse(io, fname, basepath, v1_config)
end
end
|
#push_log_event(tag, time, record) ⇒ Object
203
204
205
206
|
# File 'lib/fluent/engine.rb', line 203
def push_log_event(tag, time, record)
return if @log_emit_thread.nil?
@log_event_queue.push([tag, time, record])
end
|
#run ⇒ Object
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
|
# File 'lib/fluent/engine.rb', line 173
def run
begin
start
if @event_router.match?($log.tag)
$log.enable_event
@log_emit_thread = Thread.new(&method(:log_event_loop))
end
sleep MAINLOOP_SLEEP_INTERVAL until @engine_stopped
rescue Exception => e
$log.error "unexpected error", error: e
$log.error_backtrace
raise
end
$log.info "shutting down fluentd"
shutdown
if @log_emit_thread
@log_event_loop_stop = true
@log_emit_thread.join
end
end
|
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
|
# File 'lib/fluent/engine.rb', line 92
def run_configure(conf)
configure(conf)
conf.check_not_fetched { |key, e|
parent_name, plugin_name = e.unused_in
if parent_name
message = if plugin_name
"section <#{e.name}> is not used in <#{parent_name}> of #{plugin_name} plugin"
else
"section <#{e.name}> is not used in <#{parent_name}>"
end
$log.warn message
next
end
unless e.name == 'system'
unless @without_source && e.name == 'source'
$log.warn "parameter '#{key}' in #{e.to_s.strip} is not used."
end
end
}
end
|
#stop ⇒ Object
198
199
200
201
|
# File 'lib/fluent/engine.rb', line 198
def stop
@engine_stopped = true
nil
end
|
#suppress_interval(interval_time) ⇒ Object
78
79
80
81
|
# File 'lib/fluent/engine.rb', line 78
def suppress_interval(interval_time)
@suppress_emit_error_log_interval = interval_time
@next_emit_error_log_time = Time.now.to_i
end
|