Class: Fluent::EngineClass

Inherits:
Object
  • Object
show all
Includes:
MessagePackFactory::Mixin
Defined in:
lib/fluent/engine.rb

Constant Summary collapse

MAINLOOP_SLEEP_INTERVAL =
0.3
MATCH_CACHE_SIZE =
1024
LOG_EMIT_INTERVAL =
0.1

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from MessagePackFactory::Mixin

#msgpack_factory, #msgpack_packer, #msgpack_unpacker

Constructor Details

#initializeEngineClass

Returns a new instance of EngineClass.



34
35
36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/fluent/engine.rb', line 34

def initialize
  @root_agent = nil
  @event_router = nil
  @default_loop = nil
  @engine_stopped = false

  @log_emit_thread = nil
  @log_event_loop_stop = false
  @log_event_queue = []

  @suppress_config_dump = false

  @system_config = SystemConfig.new
end

Instance Attribute Details

#matchesObject (readonly)

Returns the value of attribute matches.



55
56
57
# File 'lib/fluent/engine.rb', line 55

def matches
  @matches
end

#root_agentObject (readonly)

Returns the value of attribute root_agent.



54
55
56
# File 'lib/fluent/engine.rb', line 54

def root_agent
  @root_agent
end

#sourcesObject (readonly)

Returns the value of attribute sources.



55
56
57
# File 'lib/fluent/engine.rb', line 55

def sources
  @sources
end

#system_configObject (readonly)

Returns the value of attribute system_config.



56
57
58
# File 'lib/fluent/engine.rb', line 56

def system_config
  @system_config
end

Instance Method Details

#add_plugin_dir(dir) ⇒ Object



127
128
129
# File 'lib/fluent/engine.rb', line 127

def add_plugin_dir(dir)
  Plugin.add_plugin_dir(dir)
end

#configure(conf) ⇒ Object



113
114
115
116
117
118
119
120
121
122
123
124
125
# File 'lib/fluent/engine.rb', line 113

def configure(conf)
  # plugins / configuration dumps
  Gem::Specification.find_all.select{|x| x.name =~ /^fluent(d|-(plugin|mixin)-.*)$/}.each do |spec|
    $log.info "gem '#{spec.name}' version '#{spec.version}'"
  end

  @root_agent.configure(conf)
  @event_router = @root_agent.event_router

  unless @suppress_config_dump
    $log.info "using configuration file: #{conf.to_s.rstrip}"
  end
end

#emit(tag, time, record) ⇒ Object



131
132
133
# File 'lib/fluent/engine.rb', line 131

def emit(tag, time, record)
  raise "BUG: use router.emit instead of Engine.emit"
end

#emit_array(tag, array) ⇒ Object



135
136
137
# File 'lib/fluent/engine.rb', line 135

def emit_array(tag, array)
  raise "BUG: use router.emit_array instead of Engine.emit_array"
end

#emit_stream(tag, es) ⇒ Object



139
140
141
# File 'lib/fluent/engine.rb', line 139

def emit_stream(tag, es)
  raise "BUG: use router.emit_stream instead of Engine.emit_stream"
end

#flush!Object



143
144
145
# File 'lib/fluent/engine.rb', line 143

def flush!
  @root_agent.flush!
end

#init(system_config) ⇒ Object



58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/fluent/engine.rb', line 58

def init(system_config)
  @system_config = system_config

  BasicSocket.do_not_reverse_lookup = true

  suppress_interval(system_config.emit_error_log_interval) unless system_config.emit_error_log_interval.nil?
  @suppress_config_dump = system_config.suppress_config_dump unless system_config.suppress_config_dump.nil?
  @without_source = system_config.without_source unless system_config.without_source.nil?

  @root_agent = RootAgent.new(log: log, system_config: @system_config)

  MessagePackFactory.init

  self
end

#logObject



74
75
76
# File 'lib/fluent/engine.rb', line 74

def log
  $log
end

#log_event_loopObject



152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
# File 'lib/fluent/engine.rb', line 152

def log_event_loop
  $log.disable_events(Thread.current)

  while sleep(LOG_EMIT_INTERVAL)
    break if @log_event_loop_stop
    next if @log_event_queue.empty?

    # NOTE: thead-safe of slice! depends on GVL
    events = @log_event_queue.slice!(0..-1)
    next if events.empty?

    events.each {|tag,time,record|
      begin
        @event_router.emit(tag, time, record)
      rescue => e
        $log.error "failed to emit fluentd's log event", tag: tag, event: record, error: e
      end
    }
  end
end

#nowObject



147
148
149
150
# File 'lib/fluent/engine.rb', line 147

def now
  # TODO thread update
  Fluent::EventTime.now
end

#parse_config(io, fname, basepath = Dir.pwd, v1_config = false) ⇒ Object



83
84
85
86
87
88
89
90
# File 'lib/fluent/engine.rb', line 83

def parse_config(io, fname, basepath = Dir.pwd, v1_config = false)
  if fname =~ /\.rb$/
    require 'fluent/config/dsl'
    Config::DSL::Parser.parse(io, File.join(basepath, fname))
  else
    Config.parse(io, fname, basepath, v1_config)
  end
end

#push_log_event(tag, time, record) ⇒ Object



203
204
205
206
# File 'lib/fluent/engine.rb', line 203

def push_log_event(tag, time, record)
  return if @log_emit_thread.nil?
  @log_event_queue.push([tag, time, record])
end

#runObject



173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
# File 'lib/fluent/engine.rb', line 173

def run
  begin
    start

    if @event_router.match?($log.tag)
      $log.enable_event
      @log_emit_thread = Thread.new(&method(:log_event_loop))
    end

    sleep MAINLOOP_SLEEP_INTERVAL until @engine_stopped

  rescue Exception => e
    $log.error "unexpected error", error: e
    $log.error_backtrace
    raise
  end

  $log.info "shutting down fluentd"
  shutdown
  if @log_emit_thread
    @log_event_loop_stop = true
    @log_emit_thread.join
  end
end

#run_configure(conf) ⇒ Object



92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/fluent/engine.rb', line 92

def run_configure(conf)
  configure(conf)
  conf.check_not_fetched { |key, e|
    parent_name, plugin_name = e.unused_in
    if parent_name
      message = if plugin_name
                  "section <#{e.name}> is not used in <#{parent_name}> of #{plugin_name} plugin"
                else
                  "section <#{e.name}> is not used in <#{parent_name}>"
                end
      $log.warn message
      next
    end
    unless e.name == 'system'
      unless @without_source && e.name == 'source'
        $log.warn "parameter '#{key}' in #{e.to_s.strip} is not used."
      end
    end
  }
end

#stopObject



198
199
200
201
# File 'lib/fluent/engine.rb', line 198

def stop
  @engine_stopped = true
  nil
end

#suppress_interval(interval_time) ⇒ Object



78
79
80
81
# File 'lib/fluent/engine.rb', line 78

def suppress_interval(interval_time)
  @suppress_emit_error_log_interval = interval_time
  @next_emit_error_log_time = Time.now.to_i
end