Class: Fluent::Agent

Inherits:
Object
  • Object
show all
Includes:
Configurable
Defined in:
lib/fluent/agent.rb

Overview

Agent is a resource unit who manages emittable plugins

Next step: ‘fluentd/root_agent.rb` Next step: `fluentd/label.rb`

Direct Known Subclasses

Label, RootAgent

Constant Summary

Constants included from Configurable

Configurable::CONFIG_TYPE_REGISTRY

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Configurable

#config, #configure_proxy_generate, #configured_section_create, included, lookup_type, register_type

Constructor Details

#initialize(log:) ⇒ Agent

Returns a new instance of Agent.



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/fluent/agent.rb', line 32

def initialize(log:)
  super()

  @context = nil
  @outputs = []
  @filters = []

  @lifecycle_control_list = nil
  # lifecycle_control_list is the list of plugins in this agent, and ordered
  # from plugins which DOES emit, then DOESN'T emit
  # (input -> output w/ router -> filter -> output w/o router)
  # for start: use this order DESC
  #   (because plugins which appears later in configurations will receive events from plugins which appears earlier)
  # for stop/before_shutdown/shutdown/after_shutdown/close/terminate: use this order ASC
  @lifecycle_cache = nil

  @log = log
  @event_router = EventRouter.new(NoMatchMatch.new(log), self)
  @error_collector = nil
end

Instance Attribute Details

#contextObject (readonly)

Returns the value of attribute context.



56
57
58
# File 'lib/fluent/agent.rb', line 56

def context
  @context
end

#error_collectorObject (readonly)

Returns the value of attribute error_collector.



58
59
60
# File 'lib/fluent/agent.rb', line 58

def error_collector
  @error_collector
end

#event_routerObject (readonly)

Returns the value of attribute event_router.



57
58
59
# File 'lib/fluent/agent.rb', line 57

def event_router
  @event_router
end

#filtersObject (readonly)

Returns the value of attribute filters.



55
56
57
# File 'lib/fluent/agent.rb', line 55

def filters
  @filters
end

#logObject (readonly)

Returns the value of attribute log.



53
54
55
# File 'lib/fluent/agent.rb', line 53

def log
  @log
end

#outputsObject (readonly)

Returns the value of attribute outputs.



54
55
56
# File 'lib/fluent/agent.rb', line 54

def outputs
  @outputs
end

Instance Method Details

#add_filter(type, pattern, conf) ⇒ Object



148
149
150
151
152
153
154
155
156
157
158
159
# File 'lib/fluent/agent.rb', line 148

def add_filter(type, pattern, conf)
  log_type = conf.for_this_worker? ? :default : :worker0
  log.info log_type, "adding filter#{@context.nil? ? '' : " in #{@context}"}", pattern: pattern, type: type

  filter = Plugin.new_filter(type)
  filter.context_router = @event_router
  filter.configure(conf)
  @filters << filter
  @event_router.add_rule(pattern, filter)

  filter
end

#add_match(type, pattern, conf) ⇒ Object



126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
# File 'lib/fluent/agent.rb', line 126

def add_match(type, pattern, conf)
  log_type = conf.for_this_worker? ? :default : :worker0
  log.info log_type, "adding match#{@context.nil? ? '' : " in #{@context}"}", pattern: pattern, type: type

  output = Plugin.new_output(type)
  output.context_router = @event_router
  output.configure(conf)
  @outputs << output
  if output.respond_to?(:outputs) && output.respond_to?(:multi_output?) && output.multi_output?
    # TODO: ruby 2.3 or later: replace `output.respond_to?(:multi_output?) && output.multi_output?` with output&.multi_output?
    outputs = if output.respond_to?(:static_outputs)
                output.static_outputs
              else
                output.outputs
              end
    @outputs.push(*outputs)
  end
  @event_router.add_rule(pattern, output)

  output
end

#configure(conf) ⇒ Object



60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/fluent/agent.rb', line 60

def configure(conf)
  super

  # initialize <match> and <filter> elements
  conf.elements('filter', 'match').each { |e|
    if !Fluent::Engine.supervisor_mode && e.for_another_worker?
      next
    end
    pattern = e.arg.empty? ? '**' : e.arg
    type = e['@type']
    raise ConfigError, "Missing '@type' parameter on <#{e.name}> directive" unless type
    if e.name == 'filter'
      add_filter(type, pattern, e)
    else
      add_match(type, pattern, e)
    end
  }
end

#emit_error_event(tag, time, record, error) ⇒ Object

For handling invalid record



162
163
# File 'lib/fluent/agent.rb', line 162

def emit_error_event(tag, time, record, error)
end

#handle_emits_error(tag, es, error) ⇒ Object



165
166
# File 'lib/fluent/agent.rb', line 165

def handle_emits_error(tag, es, error)
end

#lifecycle(desc: false) ⇒ Object



107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
# File 'lib/fluent/agent.rb', line 107

def lifecycle(desc: false)
  kind_list = if desc
                [:output, :filter, :output_with_router]
              else
                [:output_with_router, :filter, :output]
              end
  kind_list.each do |kind|
    list = if desc
             lifecycle_control_list[kind].reverse
           else
             lifecycle_control_list[kind]
           end
    display_kind = (kind == :output_with_router ? :output : kind)
    list.each do |instance|
      yield instance, display_kind
    end
  end
end

#lifecycle_control_listObject



79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/fluent/agent.rb', line 79

def lifecycle_control_list
  return @lifecycle_control_list if @lifecycle_control_list

  lifecycle_control_list = {
    input: [],
    output_with_router: [],
    filter: [],
    output: [],
  }
  if self.respond_to?(:inputs)
    inputs.each do |i|
      lifecycle_control_list[:input] << i
    end
  end
  outputs.each do |o|
    if o.has_router?
      lifecycle_control_list[:output_with_router] << o
    else
      lifecycle_control_list[:output] << o
    end
  end
  filters.each do |f|
    lifecycle_control_list[:filter] << f
  end

  @lifecycle_control_list = lifecycle_control_list
end