Class: Fluent::Supervisor

Inherits:
Object
  • Object
show all
Defined in:
lib/fluent/supervisor.rb

Defined Under Namespace

Classes: LoggerInitializer

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(opt) ⇒ Supervisor

Returns a new instance of Supervisor.



366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
# File 'lib/fluent/supervisor.rb', line 366

def initialize(opt)
  @daemonize = opt[:daemonize]
  @supervise = opt[:supervise]
  @standalone_worker= opt[:standalone_worker]
  @config_path = opt[:config_path]
  @inline_config = opt[:inline_config]
  @use_v1_config = opt[:use_v1_config]
  @log_path = opt[:log_path]
  @dry_run = opt[:dry_run]
  @show_plugin_config = opt[:show_plugin_config]
  @libs = opt[:libs]
  @plugin_dirs = opt[:plugin_dirs]
  @chgroup = opt[:chgroup]
  @chuser = opt[:chuser]
  @rpc_server = nil
  @process_name = nil

  @log_level = opt[:log_level]
  @suppress_interval = opt[:suppress_interval]
  @suppress_config_dump = opt[:suppress_config_dump]
  @without_source = opt[:without_source]
  @signame = opt[:signame]

  @suppress_repeated_stacktrace = opt[:suppress_repeated_stacktrace]
  log_opts = {suppress_repeated_stacktrace: @suppress_repeated_stacktrace}
  @log = LoggerInitializer.new(@log_path, @log_level, @chuser, @chgroup, log_opts)
  @finished = false
end

Class Method Details

.default_optionsObject



344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
# File 'lib/fluent/supervisor.rb', line 344

def self.default_options
  {
    config_path: Fluent::DEFAULT_CONFIG_PATH,
    plugin_dirs: [Fluent::DEFAULT_PLUGIN_DIR],
    log_level: Fluent::Log::LEVEL_INFO,
    log_path: nil,
    daemonize: nil,
    libs: [],
    setup_path: nil,
    chuser: nil,
    chgroup: nil,
    suppress_interval: 0,
    suppress_repeated_stacktrace: true,
    without_source: false,
    use_v1_config: true,
    supervise: true,
    standalone_worker: false,
    signame: nil,
    winsvcreg: nil,
  }
end

.load_config(path, params = {}) ⇒ Object



203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
# File 'lib/fluent/supervisor.rb', line 203

def self.load_config(path, params = {})

  pre_loadtime = 0
  pre_loadtime = params['pre_loadtime'].to_i if params['pre_loadtime']
  pre_config_mtime = nil
  pre_config_mtime = params['pre_config_mtime'] if params['pre_config_mtime']
  config_mtime = File.mtime(path)

  # reuse previous config if last load time is within 5 seconds and mtime of the config file is not changed
  if Time.now - Time.at(pre_loadtime) < 5 and config_mtime == pre_config_mtime
    return params['pre_conf']
  end

  config_fname = File.basename(path)
  config_basedir = File.dirname(path)
  config_data = File.read(path)
  inline_config = params['inline_config']
  if inline_config == '-'
    config_data << "\n" << STDIN.read
  elsif inline_config
    config_data << "\n" << inline_config.gsub("\\n","\n")
  end
  fluentd_conf = Fluent::Config.parse(config_data, config_fname, config_basedir, params['use_v1_config'])
  system_config = SystemConfig.create(fluentd_conf)

  log_level = system_config.log_level || params['log_level']
  suppress_repeated_stacktrace = system_config.suppress_repeated_stacktrace || params['suppress_repeated_stacktrace']
  log_path = params['log_path']
  chuser = params['chuser']
  chgroup = params['chgroup']
  rpc_endpoint = system_config.rpc_endpoint
  enable_get_dump = system_config.enable_get_dump

  log_opts = {suppress_repeated_stacktrace: suppress_repeated_stacktrace}
  logger_initializer = Supervisor::LoggerInitializer.new(log_path, log_level, chuser, chgroup, log_opts)
  # this #init sets initialized logger to $log
  logger_initializer.init
  logger = $log

  command_sender = Fluent.windows? ? "pipe" : "signal"

  # ServerEngine's "daemonize" option is boolean, and path of pid file is brought by "pid_path"
  pid_path = params['daemonize']
  daemonize = !!params['daemonize']
  main_cmd = params['main_cmd']
  signame = params['signame']

  se_config = {
      worker_type: 'spawn',
      workers: 1,
      log_stdin: false,
      log_stdout: false,
      log_stderr: false,
      enable_heartbeat: true,
      auto_heartbeat: false,
      unrecoverable_exit_codes: [2],
      stop_immediately_at_unrecoverable_exit: true,
      logger: logger,
      log: logger.out,
      log_path: log_path,
      log_level: log_level,
      logger_initializer: logger_initializer,
      chuser: chuser,
      chgroup: chgroup,
      chumask: 0,
      suppress_repeated_stacktrace: suppress_repeated_stacktrace,
      daemonize: daemonize,
      rpc_endpoint: rpc_endpoint,
      enable_get_dump: enable_get_dump,
      windows_daemon_cmdline: [ServerEngine.ruby_bin_path,
                               File.join(File.dirname(__FILE__), 'daemon.rb'),
                               ServerModule.name,
                               WorkerModule.name,
                               path,
                               JSON.dump(params)],
      command_sender: command_sender,
      fluentd_conf: fluentd_conf,
      main_cmd: main_cmd,
      signame: signame,
  }
  if daemonize
    se_config[:pid_path] = pid_path
  end
  pre_params = params.dup
  params['pre_loadtime'] = Time.now.to_i
  params['pre_config_mtime'] = config_mtime
  params['pre_conf'] = se_config
  # prevent pre_conf from being too big by reloading many times.
  pre_params['pre_conf'] = nil
  params['pre_conf'][:windows_daemon_cmdline][5] = JSON.dump(pre_params)

  return se_config
end

Instance Method Details

#optionsObject



405
406
407
408
409
410
411
412
# File 'lib/fluent/supervisor.rb', line 405

def options
  {
    'config_path' => @config_path,
    'pid_file' => @daemonize,
    'plugin_dirs' => @plugin_dirs,
    'log_path' => @log_path
  }
end

#run_supervisorObject



395
396
397
398
399
400
401
402
403
# File 'lib/fluent/supervisor.rb', line 395

def run_supervisor
  @log.init
  show_plugin_config if @show_plugin_config
  read_config
  set_system_config

  dry_run if @dry_run
  supervise
end

#run_workerObject



414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
# File 'lib/fluent/supervisor.rb', line 414

def run_worker
  begin
    require 'sigdump/setup'
  rescue Exception
    # ignore LoadError and others (related with signals): it may raise these errors in Windows
  end
  @log.init
  Process.setproctitle("worker:#{@process_name}") if @process_name

  show_plugin_config if @show_plugin_config
  read_config
  set_system_config

  install_main_process_signal_handlers

  $log.info "starting fluentd-#{Fluent::VERSION} without supervision"

  main_process do
    create_socket_manager if @standalone_worker
    change_privilege
    init_engine
    run_configure
    run_engine
    exit 0
  end
end