Class: Fluent::Agent
- Inherits:
-
Object
- Object
- Fluent::Agent
- Includes:
- Configurable
- Defined in:
- lib/fluent/agent.rb
Overview
Agent is a resource unit who manages emittable plugins
Next step: ‘fluentd/root_agent.rb` Next step: `fluentd/label.rb`
Defined Under Namespace
Classes: NoMatchMatch
Constant Summary
Constants included from Configurable
Configurable::CONFIG_TYPE_REGISTRY
Instance Attribute Summary collapse
-
#context ⇒ Object
readonly
Returns the value of attribute context.
-
#error_collector ⇒ Object
readonly
Returns the value of attribute error_collector.
-
#event_router ⇒ Object
readonly
Returns the value of attribute event_router.
-
#filters ⇒ Object
readonly
Returns the value of attribute filters.
-
#log ⇒ Object
readonly
Returns the value of attribute log.
-
#outputs ⇒ Object
readonly
Returns the value of attribute outputs.
Instance Method Summary collapse
- #add_filter(type, pattern, conf) ⇒ Object
- #add_match(type, pattern, conf) ⇒ Object
- #configure(conf) ⇒ Object
-
#emit_error_event(tag, time, record, error) ⇒ Object
For handling invalid record.
- #flush! ⇒ Object
- #flush_recursive(array) ⇒ Object
- #handle_emits_error(tag, es, error) ⇒ Object
-
#initialize(opts = {}) ⇒ Agent
constructor
A new instance of Agent.
- #shutdown ⇒ Object
- #start ⇒ Object
Methods included from Configurable
#config, included, lookup_type, register_type
Constructor Details
#initialize(opts = {}) ⇒ Agent
Returns a new instance of Agent.
32 33 34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/fluent/agent.rb', line 32 def initialize(opts = {}) super() @context = nil @outputs = [] @filters = [] @started_outputs = [] @started_filters = [] @log = Engine.log @event_router = EventRouter.new(NoMatchMatch.new(log), self) @error_collector = nil end |
Instance Attribute Details
#context ⇒ Object (readonly)
Returns the value of attribute context.
49 50 51 |
# File 'lib/fluent/agent.rb', line 49 def context @context end |
#error_collector ⇒ Object (readonly)
Returns the value of attribute error_collector.
51 52 53 |
# File 'lib/fluent/agent.rb', line 51 def error_collector @error_collector end |
#event_router ⇒ Object (readonly)
Returns the value of attribute event_router.
50 51 52 |
# File 'lib/fluent/agent.rb', line 50 def event_router @event_router end |
#filters ⇒ Object (readonly)
Returns the value of attribute filters.
48 49 50 |
# File 'lib/fluent/agent.rb', line 48 def filters @filters end |
#log ⇒ Object (readonly)
Returns the value of attribute log.
46 47 48 |
# File 'lib/fluent/agent.rb', line 46 def log @log end |
#outputs ⇒ Object (readonly)
Returns the value of attribute outputs.
47 48 49 |
# File 'lib/fluent/agent.rb', line 47 def outputs @outputs end |
Instance Method Details
#add_filter(type, pattern, conf) ⇒ Object
140 141 142 143 144 145 146 147 148 149 150 |
# File 'lib/fluent/agent.rb', line 140 def add_filter(type, pattern, conf) log.info "adding filter#{@context.nil? ? '' : " in #{@context}"}", pattern: pattern, type: type filter = Plugin.new_filter(type) filter.router = @event_router filter.configure(conf) @filters << filter @event_router.add_rule(pattern, filter) filter end |
#add_match(type, pattern, conf) ⇒ Object
128 129 130 131 132 133 134 135 136 137 138 |
# File 'lib/fluent/agent.rb', line 128 def add_match(type, pattern, conf) log.info "adding match#{@context.nil? ? '' : " in #{@context}"}", pattern: pattern, type: type output = Plugin.new_output(type) output.router = @event_router output.configure(conf) @outputs << output @event_router.add_rule(pattern, output) output end |
#configure(conf) ⇒ Object
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
# File 'lib/fluent/agent.rb', line 53 def configure(conf) super # initialize <match> and <filter> elements conf.elements.select { |e| e.name == 'filter' || e.name == 'match' }.each { |e| pattern = e.arg.empty? ? '**' : e.arg type = e['@type'] || e['type'] raise ConfigError, "Missing '@type' parameter on <#{e.name}> directive" unless type if e.name == 'filter' add_filter(type, pattern, e) else add_match(type, pattern, e) end } end |
#emit_error_event(tag, time, record, error) ⇒ Object
For handling invalid record
153 154 |
# File 'lib/fluent/agent.rb', line 153 def emit_error_event(tag, time, record, error) end |
#flush! ⇒ Object
109 110 111 |
# File 'lib/fluent/agent.rb', line 109 def flush! flush_recursive(@outputs) end |
#flush_recursive(array) ⇒ Object
113 114 115 116 117 118 119 120 121 122 123 124 125 126 |
# File 'lib/fluent/agent.rb', line 113 def flush_recursive(array) array.each { |o| begin if o.is_a?(BufferedOutput) o.force_flush elsif o.is_a?(MultiOutput) flush_recursive(o.outputs) end rescue => e log.debug "error while force flushing", error_class: e.class, error: e log.debug_backtrace end } end |
#handle_emits_error(tag, es, error) ⇒ Object
156 157 |
# File 'lib/fluent/agent.rb', line 156 def handle_emits_error(tag, es, error) end |
#shutdown ⇒ Object
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 |
# File 'lib/fluent/agent.rb', line 81 def shutdown @started_filters.map { |f| Thread.new do begin log.info "shutting down filter#{@context.nil? ? '' : " in #{@context}"}", type: Plugin.lookup_name_from_class(f.class), plugin_id: f.plugin_id f.shutdown rescue => e log.warn "unexpected error while shutting down filter plugins", plugin: f.class, plugin_id: f.plugin_id, error_class: e.class, error: e log.warn_backtrace end end }.each { |t| t.join } # Output plugin as filter emits records at shutdown so emit problem still exist. # This problem will be resolved after actual filter mechanizm. @started_outputs.map { |o| Thread.new do begin log.info "shutting down output#{@context.nil? ? '' : " in #{@context}"}", type: Plugin.lookup_name_from_class(o.class), plugin_id: o.plugin_id o.shutdown rescue => e log.warn "unexpected error while shutting down output plugins", plugin: o.class, plugin_id: o.plugin_id, error_class: e.class, error: e log.warn_backtrace end end }.each { |t| t.join } end |
#start ⇒ Object
69 70 71 72 73 74 75 76 77 78 79 |
# File 'lib/fluent/agent.rb', line 69 def start @outputs.each { |o| o.start @started_outputs << o } @filters.each { |f| f.start @started_filters << f } end |