Class: Fluent::EngineClass
- Inherits:
-
Object
- Object
- Fluent::EngineClass
show all
- Defined in:
- lib/fluent/engine.rb
Defined Under Namespace
Classes: DummyMessagePackFactory
Constant Summary
collapse
- MATCH_CACHE_SIZE =
1024
- LOG_EMIT_INTERVAL =
0.1
Instance Attribute Summary collapse
Instance Method Summary
collapse
Constructor Details
Returns a new instance of EngineClass.
42
43
44
45
46
47
48
49
50
51
52
53
54
55
|
# File 'lib/fluent/engine.rb', line 42
def initialize
@root_agent = nil
@event_router = nil
@default_loop = nil
@engine_stopped = false
@log_emit_thread = nil
@log_event_loop_stop = false
@log_event_queue = []
@suppress_config_dump = false
@msgpack_factory = DummyMessagePackFactory.new
end
|
Instance Attribute Details
#matches ⇒ Object
Returns the value of attribute matches.
61
62
63
|
# File 'lib/fluent/engine.rb', line 61
def matches
@matches
end
|
#msgpack_factory ⇒ Object
Returns the value of attribute msgpack_factory.
62
63
64
|
# File 'lib/fluent/engine.rb', line 62
def msgpack_factory
@msgpack_factory
end
|
#root_agent ⇒ Object
Returns the value of attribute root_agent.
60
61
62
|
# File 'lib/fluent/engine.rb', line 60
def root_agent
@root_agent
end
|
#sources ⇒ Object
Returns the value of attribute sources.
61
62
63
|
# File 'lib/fluent/engine.rb', line 61
def sources
@sources
end
|
#system_config ⇒ Object
Returns the value of attribute system_config.
63
64
65
|
# File 'lib/fluent/engine.rb', line 63
def system_config
@system_config
end
|
Instance Method Details
123
124
125
126
127
128
129
130
131
132
133
134
135
|
# File 'lib/fluent/engine.rb', line 123
def configure(conf)
Gem::Specification.find_all.select{|x| x.name =~ /^fluent(d|-(plugin|mixin)-.*)$/}.each do |spec|
$log.info "gem '#{spec.name}' version '#{spec.version}'"
end
@root_agent.configure(conf)
@event_router = @root_agent.event_router
unless @suppress_config_dump
$log.info "using configuration file: #{conf.to_s.rstrip}"
end
end
|
#emit(tag, time, record) ⇒ Object
141
142
143
144
145
|
# File 'lib/fluent/engine.rb', line 141
def emit(tag, time, record)
unless record.nil?
emit_stream tag, OneEventStream.new(time, record)
end
end
|
#emit_array(tag, array) ⇒ Object
147
148
149
|
# File 'lib/fluent/engine.rb', line 147
def emit_array(tag, array)
emit_stream tag, ArrayEventStream.new(array)
end
|
#emit_stream(tag, es) ⇒ Object
151
152
153
|
# File 'lib/fluent/engine.rb', line 151
def emit_stream(tag, es)
@event_router.emit_stream(tag, es)
end
|
#flush! ⇒ Object
155
156
157
|
# File 'lib/fluent/engine.rb', line 155
def flush!
@root_agent.flush!
end
|
#init(system_config) ⇒ Object
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
|
# File 'lib/fluent/engine.rb', line 65
def init(system_config)
@system_config = system_config
BasicSocket.do_not_reverse_lookup = true
Plugin.load_plugins
if defined?(Encoding)
Encoding.default_internal = 'ASCII-8BIT' if Encoding.respond_to?(:default_internal)
Encoding.default_external = 'ASCII-8BIT' if Encoding.respond_to?(:default_external)
end
suppress_interval(system_config.emit_error_log_interval) unless system_config.emit_error_log_interval.nil?
@suppress_config_dump = system_config.suppress_config_dump unless system_config.suppress_config_dump.nil?
@without_source = system_config.without_source unless system_config.without_source.nil?
@root_agent = RootAgent.new(@system_config)
self
end
|
#load_plugin_dir(dir) ⇒ Object
137
138
139
|
# File 'lib/fluent/engine.rb', line 137
def load_plugin_dir(dir)
Plugin.load_plugin_dir(dir)
end
|
#log ⇒ Object
84
85
86
|
# File 'lib/fluent/engine.rb', line 84
def log
$log
end
|
#log_event_loop ⇒ Object
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
|
# File 'lib/fluent/engine.rb', line 164
def log_event_loop
$log.disable_events(Thread.current)
while sleep(LOG_EMIT_INTERVAL)
break if @log_event_loop_stop
next if @log_event_queue.empty?
events = @log_event_queue.slice!(0..-1)
next if events.empty?
events.each {|tag,time,record|
begin
@event_router.emit(tag, time, record)
rescue => e
$log.error "failed to emit fluentd's log event", tag: tag, event: record, error_class: e.class, error: e
end
}
end
end
|
#now ⇒ Object
159
160
161
162
|
# File 'lib/fluent/engine.rb', line 159
def now
Time.now.to_i
end
|
#parse_config(io, fname, basepath = Dir.pwd, v1_config = false) ⇒ Object
93
94
95
96
97
98
99
100
|
# File 'lib/fluent/engine.rb', line 93
def parse_config(io, fname, basepath = Dir.pwd, v1_config = false)
if fname =~ /\.rb$/
require 'fluent/config/dsl'
Config::DSL::Parser.parse(io, File.join(basepath, fname))
else
Config.parse(io, fname, basepath, v1_config)
end
end
|
#push_log_event(tag, time, record) ⇒ Object
229
230
231
232
|
# File 'lib/fluent/engine.rb', line 229
def push_log_event(tag, time, record)
return if @log_emit_thread.nil?
@log_event_queue.push([tag, time, record])
end
|
#run ⇒ Object
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
|
# File 'lib/fluent/engine.rb', line 185
def run
begin
start
if @event_router.match?($log.tag)
$log.enable_event
@log_emit_thread = Thread.new(&method(:log_event_loop))
end
unless @engine_stopped
@default_loop = Coolio::Loop.default
@default_loop.attach Coolio::TimerWatcher.new(1, true)
@default_loop.run
end
if @engine_stopped and @default_loop
@default_loop.stop
@default_loop = nil
end
rescue => e
$log.error "unexpected error", error_class: e.class, error: e
$log.error_backtrace
ensure
$log.info "shutting down fluentd"
shutdown
if @log_emit_thread
@log_event_loop_stop = true
@log_emit_thread.join
end
end
end
|
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
|
# File 'lib/fluent/engine.rb', line 102
def run_configure(conf)
configure(conf)
conf.check_not_fetched { |key, e|
parent_name, plugin_name = e.unused_in
if parent_name
message = if plugin_name
"section <#{e.name}> is not used in <#{parent_name}> of #{plugin_name} plugin"
else
"section <#{e.name}> is not used in <#{parent_name}>"
end
$log.warn message
next
end
unless e.name == 'system'
unless @without_source && e.name == 'source'
$log.warn "parameter '#{key}' in #{e.to_s.strip} is not used."
end
end
}
end
|
#stop ⇒ Object
220
221
222
223
224
225
226
227
|
# File 'lib/fluent/engine.rb', line 220
def stop
@engine_stopped = true
if @default_loop
@default_loop.stop
@default_loop = nil
end
nil
end
|
#suppress_interval(interval_time) ⇒ Object
88
89
90
91
|
# File 'lib/fluent/engine.rb', line 88
def suppress_interval(interval_time)
@suppress_emit_error_log_interval = interval_time
@next_emit_error_log_time = Time.now.to_i
end
|