Class: Fluent::EngineClass

Inherits:
Object
  • Object
show all
Defined in:
lib/fluent/engine.rb

Defined Under Namespace

Classes: DummyMessagePackFactory

Constant Summary collapse

MATCH_CACHE_SIZE =
1024
LOG_EMIT_INTERVAL =
0.1

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeEngineClass

Returns a new instance of EngineClass.



42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/fluent/engine.rb', line 42

def initialize
  @root_agent = nil
  @event_router = nil
  @default_loop = nil
  @engine_stopped = false

  @log_emit_thread = nil
  @log_event_loop_stop = false
  @log_event_queue = []

  @suppress_config_dump = false

  @msgpack_factory = DummyMessagePackFactory.new
end

Instance Attribute Details

#matchesObject (readonly)

Returns the value of attribute matches.



61
62
63
# File 'lib/fluent/engine.rb', line 61

def matches
  @matches
end

#msgpack_factoryObject (readonly)

Returns the value of attribute msgpack_factory.



62
63
64
# File 'lib/fluent/engine.rb', line 62

def msgpack_factory
  @msgpack_factory
end

#root_agentObject (readonly)

Returns the value of attribute root_agent.



60
61
62
# File 'lib/fluent/engine.rb', line 60

def root_agent
  @root_agent
end

#sourcesObject (readonly)

Returns the value of attribute sources.



61
62
63
# File 'lib/fluent/engine.rb', line 61

def sources
  @sources
end

#system_configObject (readonly)

Returns the value of attribute system_config.



63
64
65
# File 'lib/fluent/engine.rb', line 63

def system_config
  @system_config
end

Instance Method Details

#configure(conf) ⇒ Object



123
124
125
126
127
128
129
130
131
132
133
134
135
# File 'lib/fluent/engine.rb', line 123

def configure(conf)
  # plugins / configuration dumps
  Gem::Specification.find_all.select{|x| x.name =~ /^fluent(d|-(plugin|mixin)-.*)$/}.each do |spec|
    $log.info "gem '#{spec.name}' version '#{spec.version}'"
  end

  @root_agent.configure(conf)
  @event_router = @root_agent.event_router

  unless @suppress_config_dump
    $log.info "using configuration file: #{conf.to_s.rstrip}"
  end
end

#emit(tag, time, record) ⇒ Object



141
142
143
144
145
# File 'lib/fluent/engine.rb', line 141

def emit(tag, time, record)
  unless record.nil?
    emit_stream tag, OneEventStream.new(time, record)
  end
end

#emit_array(tag, array) ⇒ Object



147
148
149
# File 'lib/fluent/engine.rb', line 147

def emit_array(tag, array)
  emit_stream tag, ArrayEventStream.new(array)
end

#emit_stream(tag, es) ⇒ Object



151
152
153
# File 'lib/fluent/engine.rb', line 151

def emit_stream(tag, es)
  @event_router.emit_stream(tag, es)
end

#flush!Object



155
156
157
# File 'lib/fluent/engine.rb', line 155

def flush!
  @root_agent.flush!
end

#init(system_config) ⇒ Object



65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/fluent/engine.rb', line 65

def init(system_config)
  @system_config = system_config

  BasicSocket.do_not_reverse_lookup = true
  Plugin.load_plugins
  if defined?(Encoding)
    Encoding.default_internal = 'ASCII-8BIT' if Encoding.respond_to?(:default_internal)
    Encoding.default_external = 'ASCII-8BIT' if Encoding.respond_to?(:default_external)
  end

  suppress_interval(system_config.emit_error_log_interval) unless system_config.emit_error_log_interval.nil?
  @suppress_config_dump = system_config.suppress_config_dump unless system_config.suppress_config_dump.nil?
  @without_source = system_config.without_source unless system_config.without_source.nil?

  @root_agent = RootAgent.new(@system_config)

  self
end

#load_plugin_dir(dir) ⇒ Object



137
138
139
# File 'lib/fluent/engine.rb', line 137

def load_plugin_dir(dir)
  Plugin.load_plugin_dir(dir)
end

#logObject



84
85
86
# File 'lib/fluent/engine.rb', line 84

def log
  $log
end

#log_event_loopObject



164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
# File 'lib/fluent/engine.rb', line 164

def log_event_loop
  $log.disable_events(Thread.current)

  while sleep(LOG_EMIT_INTERVAL)
    break if @log_event_loop_stop
    next if @log_event_queue.empty?

    # NOTE: thead-safe of slice! depends on GVL
    events = @log_event_queue.slice!(0..-1)
    next if events.empty?

    events.each {|tag,time,record|
      begin
        @event_router.emit(tag, time, record)
      rescue => e
        $log.error "failed to emit fluentd's log event", tag: tag, event: record, error_class: e.class, error: e
      end
    }
  end
end

#nowObject



159
160
161
162
# File 'lib/fluent/engine.rb', line 159

def now
  # TODO thread update
  Time.now.to_i
end

#parse_config(io, fname, basepath = Dir.pwd, v1_config = false) ⇒ Object



93
94
95
96
97
98
99
100
# File 'lib/fluent/engine.rb', line 93

def parse_config(io, fname, basepath = Dir.pwd, v1_config = false)
  if fname =~ /\.rb$/
    require 'fluent/config/dsl'
    Config::DSL::Parser.parse(io, File.join(basepath, fname))
  else
    Config.parse(io, fname, basepath, v1_config)
  end
end

#push_log_event(tag, time, record) ⇒ Object



229
230
231
232
# File 'lib/fluent/engine.rb', line 229

def push_log_event(tag, time, record)
  return if @log_emit_thread.nil?
  @log_event_queue.push([tag, time, record])
end

#runObject



185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
# File 'lib/fluent/engine.rb', line 185

def run
  begin
    start

    if @event_router.match?($log.tag)
      $log.enable_event
      @log_emit_thread = Thread.new(&method(:log_event_loop))
    end

    unless @engine_stopped
      # for empty loop
      @default_loop = Coolio::Loop.default
      @default_loop.attach Coolio::TimerWatcher.new(1, true)
      # TODO attach async watch for thread pool
      @default_loop.run
    end

    if @engine_stopped and @default_loop
      @default_loop.stop
      @default_loop = nil
    end

  rescue => e
    $log.error "unexpected error", error_class: e.class, error: e
    $log.error_backtrace
  ensure
    $log.info "shutting down fluentd"
    shutdown
    if @log_emit_thread
      @log_event_loop_stop = true
      @log_emit_thread.join
    end
  end
end

#run_configure(conf) ⇒ Object



102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
# File 'lib/fluent/engine.rb', line 102

def run_configure(conf)
  configure(conf)
  conf.check_not_fetched { |key, e|
    parent_name, plugin_name = e.unused_in
    if parent_name
      message = if plugin_name
                  "section <#{e.name}> is not used in <#{parent_name}> of #{plugin_name} plugin"
                else
                  "section <#{e.name}> is not used in <#{parent_name}>"
                end
      $log.warn message
      next
    end
    unless e.name == 'system'
      unless @without_source && e.name == 'source'
        $log.warn "parameter '#{key}' in #{e.to_s.strip} is not used."
      end
    end
  }
end

#stopObject



220
221
222
223
224
225
226
227
# File 'lib/fluent/engine.rb', line 220

def stop
  @engine_stopped = true
  if @default_loop
    @default_loop.stop
    @default_loop = nil
  end
  nil
end

#suppress_interval(interval_time) ⇒ Object



88
89
90
91
# File 'lib/fluent/engine.rb', line 88

def suppress_interval(interval_time)
  @suppress_emit_error_log_interval = interval_time
  @next_emit_error_log_time = Time.now.to_i
end