Class: Fluent::Agent
- Inherits:
-
Object
- Object
- Fluent::Agent
- Includes:
- Configurable
- Defined in:
- lib/fluent/agent.rb
Overview
Agent is a resource unit who manages emittable plugins
Next step: fluentd/root_agent.rb Next step: fluentd/label.rb
Defined Under Namespace
Classes: NoMatchMatch
Constant Summary
Constants included from Configurable
Configurable::CONFIG_TYPE_REGISTRY
Instance Attribute Summary collapse
-
#context ⇒ Object
readonly
Returns the value of attribute context.
-
#error_collector ⇒ Object
readonly
Returns the value of attribute error_collector.
-
#event_router ⇒ Object
readonly
Returns the value of attribute event_router.
-
#filters ⇒ Object
readonly
Returns the value of attribute filters.
-
#log ⇒ Object
readonly
Returns the value of attribute log.
-
#outputs ⇒ Object
readonly
Returns the value of attribute outputs.
Instance Method Summary collapse
- #add_filter(type, pattern, conf) ⇒ Object
- #add_match(type, pattern, conf) ⇒ Object
- #configure(conf) ⇒ Object
-
#emit_error_event(tag, time, record, error) ⇒ Object
For handling invalid record.
- #handle_emits_error(tag, es, error) ⇒ Object
-
#initialize(log:) ⇒ Agent
constructor
A new instance of Agent.
- #lifecycle(desc: false) ⇒ Object
- #lifecycle_control_list ⇒ Object
Methods included from Configurable
#config, included, lookup_type, register_type
Constructor Details
#initialize(log:) ⇒ Agent
Returns a new instance of Agent.
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
# File 'lib/fluent/agent.rb', line 31 def initialize(log:) super() @context = nil @outputs = [] @filters = [] @lifecycle_control_list = nil # lifecycle_control_list is the list of plugins in this agent, and ordered # from plugins which DOES emit, then DOESN'T emit # (input -> output w/ router -> filter -> output w/o router) # for start: use this order DESC # (because plugins which appears later in configurations will receive events from plugins which appears ealier) # for stop/before_shutdown/shutdown/after_shutdown/close/terminate: use this order ASC @lifecycle_cache = nil @log = log @event_router = EventRouter.new(NoMatchMatch.new(log), self) @error_collector = nil end |
Instance Attribute Details
#context ⇒ Object (readonly)
Returns the value of attribute context.
55 56 57 |
# File 'lib/fluent/agent.rb', line 55 def context @context end |
#error_collector ⇒ Object (readonly)
Returns the value of attribute error_collector.
57 58 59 |
# File 'lib/fluent/agent.rb', line 57 def error_collector @error_collector end |
#event_router ⇒ Object (readonly)
Returns the value of attribute event_router.
56 57 58 |
# File 'lib/fluent/agent.rb', line 56 def event_router @event_router end |
#filters ⇒ Object (readonly)
Returns the value of attribute filters.
54 55 56 |
# File 'lib/fluent/agent.rb', line 54 def filters @filters end |
#log ⇒ Object (readonly)
Returns the value of attribute log.
52 53 54 |
# File 'lib/fluent/agent.rb', line 52 def log @log end |
#outputs ⇒ Object (readonly)
Returns the value of attribute outputs.
53 54 55 |
# File 'lib/fluent/agent.rb', line 53 def outputs @outputs end |
Instance Method Details
#add_filter(type, pattern, conf) ⇒ Object
145 146 147 148 149 150 151 152 153 154 155 |
# File 'lib/fluent/agent.rb', line 145 def add_filter(type, pattern, conf) log.info "adding filter#{@context.nil? ? '' : " in #{@context}"}", pattern: pattern, type: type filter = Plugin.new_filter(type) filter.router = @event_router filter.configure(conf) @filters << filter @event_router.add_rule(pattern, filter) filter end |
#add_match(type, pattern, conf) ⇒ Object
130 131 132 133 134 135 136 137 138 139 140 141 142 143 |
# File 'lib/fluent/agent.rb', line 130 def add_match(type, pattern, conf) log.info "adding match#{@context.nil? ? '' : " in #{@context}"}", pattern: pattern, type: type output = Plugin.new_output(type) output.router = @event_router if output.respond_to?(:router=) output.configure(conf) @outputs << output if output.respond_to?(:outputs) && (output.is_a?(Fluent::Plugin::MultiOutput) || output.is_a?(Fluent::MultiOutput)) @outputs.push(*output.outputs) end @event_router.add_rule(pattern, output) output end |
#configure(conf) ⇒ Object
59 60 61 62 63 64 65 66 67 68 69 70 71 72 |
# File 'lib/fluent/agent.rb', line 59 def configure(conf) super # initialize <match> and <filter> elements conf.elements('filter', 'match').each { |e| pattern = e.arg.empty? ? '**' : e.arg type = e['@type'] if e.name == 'filter' add_filter(type, pattern, e) else add_match(type, pattern, e) end } end |
#emit_error_event(tag, time, record, error) ⇒ Object
For handling invalid record
158 159 |
# File 'lib/fluent/agent.rb', line 158 def emit_error_event(tag, time, record, error) end |
#handle_emits_error(tag, es, error) ⇒ Object
161 162 |
# File 'lib/fluent/agent.rb', line 161 def handle_emits_error(tag, es, error) end |
#lifecycle(desc: false) ⇒ Object
111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 |
# File 'lib/fluent/agent.rb', line 111 def lifecycle(desc: false) kind_list = if desc [:output, :filter, :output_with_router] else [:output_with_router, :filter, :output] end kind_list.each do |kind| list = if desc lifecycle_control_list[kind].reverse else lifecycle_control_list[kind] end display_kind = (kind == :output_with_router ? :output : kind) list.each do |instance| yield instance, display_kind end end end |
#lifecycle_control_list ⇒ Object
74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 |
# File 'lib/fluent/agent.rb', line 74 def lifecycle_control_list return @lifecycle_control_list if @lifecycle_control_list lifecycle_control_list = { input: [], output_with_router: [], filter: [], output: [], } if self.respond_to?(:inputs) inputs.each do |i| lifecycle_control_list[:input] << i end end recursive_output_traverse = ->(o) { if o.has_router? lifecycle_control_list[:output_with_router] << o else lifecycle_control_list[:output] << o end if o.respond_to?(:outputs) o.outputs.each do |store| recursive_output_traverse.call(store) end end } outputs.each do |o| recursive_output_traverse.call(o) end filters.each do |f| lifecycle_control_list[:filter] << f end @lifecycle_control_list = lifecycle_control_list end |