Class: Fluent::Supervisor

Inherits:
Object
  • Object
show all
Defined in:
lib/fluent/supervisor.rb

Defined Under Namespace

Classes: LoggerInitializer

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(opt) ⇒ Supervisor

Returns a new instance of Supervisor.



378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
# File 'lib/fluent/supervisor.rb', line 378

def initialize(opt)
  @daemonize = opt[:daemonize]
  @supervise = opt[:supervise]
  @standalone_worker= opt[:standalone_worker]
  @config_path = opt[:config_path]
  @inline_config = opt[:inline_config]
  @use_v1_config = opt[:use_v1_config]
  @log_path = opt[:log_path]
  @dry_run = opt[:dry_run]
  @show_plugin_config = opt[:show_plugin_config]
  @libs = opt[:libs]
  @plugin_dirs = opt[:plugin_dirs]
  @chgroup = opt[:chgroup]
  @chuser = opt[:chuser]
  @rpc_server = nil
  @process_name = nil

  @log_level = opt[:log_level]
  @log_rotate_age = opt[:log_rotate_age]
  @log_rotate_size = opt[:log_rotate_size]
  @suppress_interval = opt[:suppress_interval]
  @suppress_config_dump = opt[:suppress_config_dump]
  @without_source = opt[:without_source]
  @signame = opt[:signame]

  @suppress_repeated_stacktrace = opt[:suppress_repeated_stacktrace]
  log_opts = {suppress_repeated_stacktrace: @suppress_repeated_stacktrace}
  @log = LoggerInitializer.new(
    @log_path, @log_level, @chuser, @chgroup, log_opts,
    log_rotate_age: @log_rotate_age,
    log_rotate_size: @log_rotate_size
  )
  @finished = false
end

Class Method Details

.default_optionsObject



356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
# File 'lib/fluent/supervisor.rb', line 356

def self.default_options
  {
    config_path: Fluent::DEFAULT_CONFIG_PATH,
    plugin_dirs: [Fluent::DEFAULT_PLUGIN_DIR],
    log_level: Fluent::Log::LEVEL_INFO,
    log_path: nil,
    daemonize: nil,
    libs: [],
    setup_path: nil,
    chuser: nil,
    chgroup: nil,
    suppress_interval: 0,
    suppress_repeated_stacktrace: true,
    without_source: false,
    use_v1_config: true,
    supervise: true,
    standalone_worker: false,
    signame: nil,
    winsvcreg: nil,
  }
end

.load_config(path, params = {}) ⇒ Object



203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
# File 'lib/fluent/supervisor.rb', line 203

def self.load_config(path, params = {})

  pre_loadtime = 0
  pre_loadtime = params['pre_loadtime'].to_i if params['pre_loadtime']
  pre_config_mtime = nil
  pre_config_mtime = params['pre_config_mtime'] if params['pre_config_mtime']
  config_mtime = File.mtime(path)

  # reuse previous config if last load time is within 5 seconds and mtime of the config file is not changed
  if Time.now - Time.at(pre_loadtime) < 5 and config_mtime == pre_config_mtime
    return params['pre_conf']
  end

  config_fname = File.basename(path)
  config_basedir = File.dirname(path)
  config_data = File.read(path)
  inline_config = params['inline_config']
  if inline_config == '-'
    config_data << "\n" << STDIN.read
  elsif inline_config
    config_data << "\n" << inline_config.gsub("\\n","\n")
  end
  fluentd_conf = Fluent::Config.parse(config_data, config_fname, config_basedir, params['use_v1_config'])
  system_config = SystemConfig.create(fluentd_conf)

  log_level = system_config.log_level || params['log_level']
  suppress_repeated_stacktrace = system_config.suppress_repeated_stacktrace || params['suppress_repeated_stacktrace']
  log_path = params['log_path']
  chuser = params['chuser']
  chgroup = params['chgroup']
  log_rotate_age = params['log_rotate_age']
  log_rotate_size = params['log_rotate_size']
  rpc_endpoint = system_config.rpc_endpoint
  enable_get_dump = system_config.enable_get_dump

  log_opts = {suppress_repeated_stacktrace: suppress_repeated_stacktrace}
  logger_initializer = Supervisor::LoggerInitializer.new(
    log_path, log_level, chuser, chgroup, log_opts,
    log_rotate_age: log_rotate_age,
    log_rotate_size: log_rotate_size
  )
  # this #init sets initialized logger to $log
  logger_initializer.init
  logger = $log

  command_sender = Fluent.windows? ? "pipe" : "signal"

  # ServerEngine's "daemonize" option is boolean, and path of pid file is brought by "pid_path"
  pid_path = params['daemonize']
  daemonize = !!params['daemonize']
  main_cmd = params['main_cmd']
  signame = params['signame']

  se_config = {
      worker_type: 'spawn',
      workers: 1,
      log_stdin: false,
      log_stdout: false,
      log_stderr: false,
      enable_heartbeat: true,
      auto_heartbeat: false,
      unrecoverable_exit_codes: [2],
      stop_immediately_at_unrecoverable_exit: true,
      logger: logger,
      log: logger.out,
      log_path: log_path,
      log_level: log_level,
      logger_initializer: logger_initializer,
      chuser: chuser,
      chgroup: chgroup,
      chumask: 0,
      suppress_repeated_stacktrace: suppress_repeated_stacktrace,
      daemonize: daemonize,
      rpc_endpoint: rpc_endpoint,
      enable_get_dump: enable_get_dump,
      windows_daemon_cmdline: [ServerEngine.ruby_bin_path,
                               File.join(File.dirname(__FILE__), 'daemon.rb'),
                               ServerModule.name,
                               WorkerModule.name,
                               path,
                               JSON.dump(params)],
      command_sender: command_sender,
      fluentd_conf: fluentd_conf,
      main_cmd: main_cmd,
      signame: signame,
  }
  if daemonize
    se_config[:pid_path] = pid_path
  end
  pre_params = params.dup
  params['pre_loadtime'] = Time.now.to_i
  params['pre_config_mtime'] = config_mtime
  params['pre_conf'] = se_config
  # prevent pre_conf from being too big by reloading many times.
  pre_params['pre_conf'] = nil
  params['pre_conf'][:windows_daemon_cmdline][5] = JSON.dump(pre_params)

  return se_config
end

Instance Method Details

#optionsObject



423
424
425
426
427
428
429
430
# File 'lib/fluent/supervisor.rb', line 423

def options
  {
    'config_path' => @config_path,
    'pid_file' => @daemonize,
    'plugin_dirs' => @plugin_dirs,
    'log_path' => @log_path
  }
end

#run_supervisorObject



413
414
415
416
417
418
419
420
421
# File 'lib/fluent/supervisor.rb', line 413

def run_supervisor
  @log.init
  show_plugin_config if @show_plugin_config
  read_config
  set_system_config

  dry_run if @dry_run
  supervise
end

#run_workerObject



432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
# File 'lib/fluent/supervisor.rb', line 432

def run_worker
  begin
    require 'sigdump/setup'
  rescue Exception
    # ignore LoadError and others (related with signals): it may raise these errors in Windows
  end
  @log.init
  Process.setproctitle("worker:#{@process_name}") if @process_name

  show_plugin_config if @show_plugin_config
  read_config
  set_system_config

  install_main_process_signal_handlers

  $log.info "starting fluentd-#{Fluent::VERSION} without supervision"

  main_process do
    create_socket_manager if @standalone_worker
    change_privilege
    init_engine
    run_configure
    run_engine
    exit 0
  end
end