Class: Fluent::RootAgent
Overview
Fluentd forms a tree structure to manage plugins:
RootAgent
|
+------------+-------------+-------------+
| | | |
<label> <source> <filter> <match>
|
+----+----+
| |
<filter> <match>
Relation:
-
RootAgent has many <label>, <source>, <filter> and <match>
-
<label> has many <match> and <filter>
Next step: ‘fluentd/agent.rb` Next step: ’fluentd/label.rb’
Defined Under Namespace
Classes: RootAgentProxyWithoutErrorCollector
Constant Summary collapse
- ERROR_LABEL =
"@ERROR".freeze
Constants included from Configurable
Configurable::CONFIG_TYPE_REGISTRY
Instance Attribute Summary collapse
-
#inputs ⇒ Object
readonly
Returns the value of attribute inputs.
-
#labels ⇒ Object
readonly
Returns the value of attribute labels.
Attributes inherited from Agent
#context, #error_collector, #event_router, #filters, #log, #outputs
Instance Method Summary collapse
- #add_label(name) ⇒ Object
- #add_source(type, conf) ⇒ Object
- #configure(conf) ⇒ Object
- #emit_error_event(tag, time, record, error) ⇒ Object
- #find_label(label_name) ⇒ Object
- #handle_emits_error(tag, es, error) ⇒ Object
-
#initialize(system_config = SystemConfig.new) ⇒ RootAgent
constructor
A new instance of RootAgent.
- #setup_error_label(e) ⇒ Object
- #shutdown ⇒ Object
- #start ⇒ Object
- #suppress_interval(interval_time) ⇒ Object
Methods inherited from Agent
#add_filter, #add_match, #flush!, #flush_recursive
Methods included from Configurable
#config, included, lookup_type, register_type
Constructor Details
#initialize(system_config = SystemConfig.new) ⇒ RootAgent
Returns a new instance of RootAgent.
50 51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/fluent/root_agent.rb', line 50 def initialize(system_config = SystemConfig.new) super @labels = {} @inputs = [] @started_inputs = [] @suppress_emit_error_log_interval = 0 @next_emit_error_log_time = nil suppress_interval(system_config.emit_error_log_interval) unless system_config.emit_error_log_interval.nil? @without_source = system_config.without_source unless system_config.without_source.nil? end |
Instance Attribute Details
#inputs ⇒ Object (readonly)
Returns the value of attribute inputs.
63 64 65 |
# File 'lib/fluent/root_agent.rb', line 63 def inputs @inputs end |
#labels ⇒ Object (readonly)
Returns the value of attribute labels.
64 65 66 |
# File 'lib/fluent/root_agent.rb', line 64 def labels @labels end |
Instance Method Details
#add_label(name) ⇒ Object
160 161 162 163 164 |
# File 'lib/fluent/root_agent.rb', line 160 def add_label(name) label = Label.new(name) label.root_agent = self @labels[name] = label end |
#add_source(type, conf) ⇒ Object
146 147 148 149 150 151 152 153 154 155 156 157 158 |
# File 'lib/fluent/root_agent.rb', line 146 def add_source(type, conf) log.info "adding source", type: type input = Plugin.new_input(type) # <source> emits events to the top-level event router (RootAgent#event_router). # Input#configure overwrites event_router to a label's event_router if it has `@label` parameter. # See also 'fluentd/plugin/input.rb' input.router = @event_router input.configure(conf) @inputs << input input end |
#configure(conf) ⇒ Object
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 |
# File 'lib/fluent/root_agent.rb', line 66 def configure(conf) error_label_config = nil # initialize <label> elements before configuring all plugins to avoid 'label not found' in input, filter and output. label_configs = {} conf.elements.select { |e| e.name == 'label' }.each { |e| name = e.arg raise ConfigError, "Missing symbol argument on <label> directive" if name.empty? if name == ERROR_LABEL error_label_config = e else add_label(name) label_configs[name] = e end } # Call 'configure' here to avoid 'label not found' label_configs.each { |name, e| @labels[name].configure(e) } setup_error_label(error_label_config) if error_label_config super # initialize <source> elements if @without_source log.info "'--without-source' is applied. Ignore <source> sections" else conf.elements.select { |e| e.name == 'source' }.each { |e| type = e['@type'] || e['type'] raise ConfigError, "Missing '@type' parameter on <source> directive" unless type add_source(type, e) } end end |
#emit_error_event(tag, time, record, error) ⇒ Object
174 175 176 177 178 179 180 181 182 183 184 |
# File 'lib/fluent/root_agent.rb', line 174 def emit_error_event(tag, time, record, error) error_info = {error_class: error.class, error: error.to_s, tag: tag, time: time} if @error_collector # A record is not included in the logs because <@ERROR> handles it. This warn is for the notification log.warn "send an error event to @ERROR:", error_info @error_collector.emit(tag, time, record) else error_info[:record] = record log.warn "dump an error event:", error_info end end |
#find_label(label_name) ⇒ Object
166 167 168 169 170 171 172 |
# File 'lib/fluent/root_agent.rb', line 166 def find_label(label_name) if label = @labels[label_name] label else raise ArgumentError, "#{label_name} label not found" end end |
#handle_emits_error(tag, es, error) ⇒ Object
186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 |
# File 'lib/fluent/root_agent.rb', line 186 def handle_emits_error(tag, es, error) error_info = {error_class: error.class, error: error.to_s, tag: tag} if @error_collector log.warn "send an error event stream to @ERROR:", error_info @error_collector.emit_stream(tag, es) else now = Engine.now if @suppress_emit_error_log_interval.zero? || now > @next_emit_error_log_time log.warn "emit transaction failed:", error_info log.warn_backtrace @next_emit_error_log_time = now + @suppress_emit_error_log_interval end raise error end end |
#setup_error_label(e) ⇒ Object
100 101 102 103 104 105 |
# File 'lib/fluent/root_agent.rb', line 100 def setup_error_label(e) error_label = add_label(ERROR_LABEL) error_label.configure(e) error_label.root_agent = RootAgentProxyWithoutErrorCollector.new(self) @error_collector = error_label.event_router end |
#shutdown ⇒ Object
120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 |
# File 'lib/fluent/root_agent.rb', line 120 def shutdown # Shutdown Input plugin first to prevent emitting to terminated Output plugin @started_inputs.map { |i| Thread.new do begin log.info "shutting down input", type: Plugin.lookup_name_from_class(i.class), plugin_id: i.plugin_id i.shutdown rescue => e log.warn "unexpected error while shutting down input plugin", plugin: i.class, plugin_id: i.plugin_id, error_class: e.class, error: e log.warn_backtrace end end }.each { |t| t.join } @labels.each { |n, l| l.shutdown } super end |
#start ⇒ Object
107 108 109 110 111 112 113 114 115 116 117 118 |
# File 'lib/fluent/root_agent.rb', line 107 def start super @labels.each { |n, l| l.start } @inputs.each { |i| i.start @started_inputs << i } end |
#suppress_interval(interval_time) ⇒ Object
141 142 143 144 |
# File 'lib/fluent/root_agent.rb', line 141 def suppress_interval(interval_time) @suppress_emit_error_log_interval = interval_time @next_emit_error_log_time = Time.now.to_i end |