Class: Fluent::Agent

Inherits:
Object
  • Object
show all
Includes:
Configurable
Defined in:
lib/fluent/agent.rb

Overview

Agent is a resource unit who manages emittable plugins

Next step: fluentd/root_agent.rb Next step: fluentd/label.rb

Direct Known Subclasses

Label, RootAgent

Defined Under Namespace

Classes: NoMatchMatch

Constant Summary

Constants included from Configurable

Configurable::CONFIG_TYPE_REGISTRY

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Configurable

#config, included, lookup_type, register_type

Constructor Details

#initialize(log:) ⇒ Agent

Returns a new instance of Agent.



31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/fluent/agent.rb', line 31

def initialize(log:)
  super()

  @context = nil
  @outputs = []
  @filters = []

  @lifecycle_control_list = nil
  # lifecycle_control_list is the list of plugins in this agent, and ordered
  # from plugins which DOES emit, then DOESN'T emit
  # (input -> output w/ router -> filter -> output w/o router)
  # for start: use this order DESC
  #   (because plugins which appears later in configurations will receive events from plugins which appears ealier)
  # for stop/before_shutdown/shutdown/after_shutdown/close/terminate: use this order ASC
  @lifecycle_cache = nil

  @log = log
  @event_router = EventRouter.new(NoMatchMatch.new(log), self)
  @error_collector = nil
end

Instance Attribute Details

#contextObject (readonly)

Returns the value of attribute context.



55
56
57
# File 'lib/fluent/agent.rb', line 55

def context
  @context
end

#error_collectorObject (readonly)

Returns the value of attribute error_collector.



57
58
59
# File 'lib/fluent/agent.rb', line 57

def error_collector
  @error_collector
end

#event_routerObject (readonly)

Returns the value of attribute event_router.



56
57
58
# File 'lib/fluent/agent.rb', line 56

def event_router
  @event_router
end

#filtersObject (readonly)

Returns the value of attribute filters.



54
55
56
# File 'lib/fluent/agent.rb', line 54

def filters
  @filters
end

#logObject (readonly)

Returns the value of attribute log.



52
53
54
# File 'lib/fluent/agent.rb', line 52

def log
  @log
end

#outputsObject (readonly)

Returns the value of attribute outputs.



53
54
55
# File 'lib/fluent/agent.rb', line 53

def outputs
  @outputs
end

Instance Method Details

#add_filter(type, pattern, conf) ⇒ Object



145
146
147
148
149
150
151
152
153
154
155
# File 'lib/fluent/agent.rb', line 145

def add_filter(type, pattern, conf)
  log.info "adding filter#{@context.nil? ? '' : " in #{@context}"}", pattern: pattern, type: type

  filter = Plugin.new_filter(type)
  filter.router = @event_router
  filter.configure(conf)
  @filters << filter
  @event_router.add_rule(pattern, filter)

  filter
end

#add_match(type, pattern, conf) ⇒ Object



130
131
132
133
134
135
136
137
138
139
140
141
142
143
# File 'lib/fluent/agent.rb', line 130

def add_match(type, pattern, conf)
  log.info "adding match#{@context.nil? ? '' : " in #{@context}"}", pattern: pattern, type: type

  output = Plugin.new_output(type)
  output.router = @event_router if output.respond_to?(:router=)
  output.configure(conf)
  @outputs << output
  if output.respond_to?(:outputs) && (output.is_a?(Fluent::Plugin::MultiOutput) || output.is_a?(Fluent::MultiOutput))
    @outputs.push(*output.outputs)
  end
  @event_router.add_rule(pattern, output)

  output
end

#configure(conf) ⇒ Object



59
60
61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/fluent/agent.rb', line 59

def configure(conf)
  super

  # initialize <match> and <filter> elements
  conf.elements('filter', 'match').each { |e|
    pattern = e.arg.empty? ? '**' : e.arg
    type = e['@type']
    if e.name == 'filter'
      add_filter(type, pattern, e)
    else
      add_match(type, pattern, e)
    end
  }
end

#emit_error_event(tag, time, record, error) ⇒ Object

For handling invalid record



158
159
# File 'lib/fluent/agent.rb', line 158

def emit_error_event(tag, time, record, error)
end

#handle_emits_error(tag, es, error) ⇒ Object



161
162
# File 'lib/fluent/agent.rb', line 161

def handle_emits_error(tag, es, error)
end

#lifecycle(desc: false) ⇒ Object



111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
# File 'lib/fluent/agent.rb', line 111

def lifecycle(desc: false)
  kind_list = if desc
                [:output, :filter, :output_with_router]
              else
                [:output_with_router, :filter, :output]
              end
  kind_list.each do |kind|
    list = if desc
             lifecycle_control_list[kind].reverse
           else
             lifecycle_control_list[kind]
           end
    display_kind = (kind == :output_with_router ? :output : kind)
    list.each do |instance|
      yield instance, display_kind
    end
  end
end

#lifecycle_control_listObject



74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# File 'lib/fluent/agent.rb', line 74

def lifecycle_control_list
  return @lifecycle_control_list if @lifecycle_control_list

  lifecycle_control_list = {
    input: [],
    output_with_router: [],
    filter: [],
    output: [],
  }
  if self.respond_to?(:inputs)
    inputs.each do |i|
      lifecycle_control_list[:input] << i
    end
  end
  recursive_output_traverse = ->(o) {
    if o.has_router?
      lifecycle_control_list[:output_with_router] << o
    else
      lifecycle_control_list[:output] << o
    end

    if o.respond_to?(:outputs)
      o.outputs.each do |store|
        recursive_output_traverse.call(store)
      end
    end
  }
  outputs.each do |o|
    recursive_output_traverse.call(o)
  end
  filters.each do |f|
    lifecycle_control_list[:filter] << f
  end

  @lifecycle_control_list = lifecycle_control_list
end