Class: Fluent::Agent

Inherits:
Object
  • Object
show all
Includes:
Configurable
Defined in:
lib/fluent/agent.rb

Overview

Agent is a resource unit who manages emittable plugins

Next step: ‘fluentd/root_agent.rb` Next step: `fluentd/label.rb`

Direct Known Subclasses

Label, RootAgent

Defined Under Namespace

Classes: NoMatchMatch

Constant Summary

Constants included from Configurable

Configurable::CONFIG_TYPE_REGISTRY

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Configurable

#config, included, lookup_type, register_type

Constructor Details

#initialize(opts = {}) ⇒ Agent

Returns a new instance of Agent.



32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/fluent/agent.rb', line 32

def initialize(opts = {})
  super()

  @context = nil
  @outputs = []
  @filters = []
  @started_outputs = []
  @started_filters = []

  @log = Engine.log
  @event_router = EventRouter.new(NoMatchMatch.new(log), self)
  @error_collector = nil
end

Instance Attribute Details

#contextObject (readonly)

Returns the value of attribute context.



49
50
51
# File 'lib/fluent/agent.rb', line 49

def context
  @context
end

#error_collectorObject (readonly)

Returns the value of attribute error_collector.



51
52
53
# File 'lib/fluent/agent.rb', line 51

def error_collector
  @error_collector
end

#event_routerObject (readonly)

Returns the value of attribute event_router.



50
51
52
# File 'lib/fluent/agent.rb', line 50

def event_router
  @event_router
end

#filtersObject (readonly)

Returns the value of attribute filters.



48
49
50
# File 'lib/fluent/agent.rb', line 48

def filters
  @filters
end

#logObject (readonly)

Returns the value of attribute log.



46
47
48
# File 'lib/fluent/agent.rb', line 46

def log
  @log
end

#outputsObject (readonly)

Returns the value of attribute outputs.



47
48
49
# File 'lib/fluent/agent.rb', line 47

def outputs
  @outputs
end

Instance Method Details

#add_filter(type, pattern, conf) ⇒ Object



140
141
142
143
144
145
146
147
148
149
150
# File 'lib/fluent/agent.rb', line 140

def add_filter(type, pattern, conf)
  log.info "adding filter#{@context.nil? ? '' : " in #{@context}"}", pattern: pattern, type: type

  filter = Plugin.new_filter(type)
  filter.router = @event_router
  filter.configure(conf)
  @filters << filter
  @event_router.add_rule(pattern, filter)

  filter
end

#add_match(type, pattern, conf) ⇒ Object



128
129
130
131
132
133
134
135
136
137
138
# File 'lib/fluent/agent.rb', line 128

def add_match(type, pattern, conf)
  log.info "adding match#{@context.nil? ? '' : " in #{@context}"}", pattern: pattern, type: type

  output = Plugin.new_output(type)
  output.router = @event_router
  output.configure(conf)
  @outputs << output
  @event_router.add_rule(pattern, output)

  output
end

#configure(conf) ⇒ Object



53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/fluent/agent.rb', line 53

def configure(conf)
  super

  # initialize <match> and <filter> elements
  conf.elements.select { |e| e.name == 'filter' || e.name == 'match' }.each { |e|
    pattern = e.arg.empty? ? '**' : e.arg
    type = e['@type'] || e['type']
    raise ConfigError, "Missing '@type' parameter on <#{e.name}> directive" unless type
    if e.name == 'filter'
      add_filter(type, pattern, e)
    else
      add_match(type, pattern, e)
    end
  }
end

#emit_error_event(tag, time, record, error) ⇒ Object

For handling invalid record



153
154
# File 'lib/fluent/agent.rb', line 153

def emit_error_event(tag, time, record, error)
end

#flush!Object



109
110
111
# File 'lib/fluent/agent.rb', line 109

def flush!
  flush_recursive(@outputs)
end

#flush_recursive(array) ⇒ Object



113
114
115
116
117
118
119
120
121
122
123
124
125
126
# File 'lib/fluent/agent.rb', line 113

def flush_recursive(array)
  array.each { |o|
    begin
      if o.is_a?(BufferedOutput)
        o.force_flush
      elsif o.is_a?(MultiOutput)
        flush_recursive(o.outputs)
      end
    rescue => e
      log.debug "error while force flushing", error_class: e.class, error: e
      log.debug_backtrace
    end
  }
end

#handle_emits_error(tag, es, error) ⇒ Object



156
157
# File 'lib/fluent/agent.rb', line 156

def handle_emits_error(tag, es, error)
end

#shutdownObject



81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# File 'lib/fluent/agent.rb', line 81

def shutdown
  @started_filters.map { |f|
    Thread.new do
      begin
        log.info "shutting down filter#{@context.nil? ? '' : " in #{@context}"}", type: Plugin.lookup_name_from_class(f.class), plugin_id: f.plugin_id
        f.shutdown
      rescue => e
        log.warn "unexpected error while shutting down filter plugins", plugin: f.class, plugin_id: f.plugin_id, error_class: e.class, error: e
        log.warn_backtrace
      end
    end
  }.each { |t| t.join }

  # Output plugin as filter emits records at shutdown so emit problem still exist.
  # This problem will be resolved after actual filter mechanizm.
  @started_outputs.map { |o|
    Thread.new do
      begin
        log.info "shutting down output#{@context.nil? ? '' : " in #{@context}"}", type: Plugin.lookup_name_from_class(o.class), plugin_id: o.plugin_id
        o.shutdown
      rescue => e
        log.warn "unexpected error while shutting down output plugins", plugin: o.class, plugin_id: o.plugin_id, error_class: e.class, error: e
        log.warn_backtrace
      end
    end
  }.each { |t| t.join }
end

#startObject



69
70
71
72
73
74
75
76
77
78
79
# File 'lib/fluent/agent.rb', line 69

def start
  @outputs.each { |o|
    o.start
    @started_outputs << o
  }

  @filters.each { |f|
    f.start
    @started_filters << f
  }
end