Class: Fluent::Compat::ObjectBufferedOutput

Inherits:
Plugin::Output show all
Includes:
PropagateDefault
Defined in:
lib/fluent/compat/output.rb

Constant Summary collapse

BUFFER_PARAMS =
Fluent::PluginHelper::CompatParameters::BUFFER_PARAMS

Constants inherited from Plugin::Output

Plugin::Output::CHUNKING_FIELD_WARN_NUM, Plugin::Output::CHUNK_KEY_PATTERN, Plugin::Output::CHUNK_KEY_PLACEHOLDER_PATTERN, Plugin::Output::FORMAT_MSGPACK_STREAM, Plugin::Output::FORMAT_MSGPACK_STREAM_TIME_INT

Constants included from Fluent::Configurable

Fluent::Configurable::CONFIG_TYPE_REGISTRY

Instance Attribute Summary

Attributes inherited from Plugin::Output

#as_secondary, #buffer, #chunk_key_tag, #chunk_key_time, #chunk_keys, #delayed_commit, #delayed_commit_timeout, #emit_count, #emit_records, #in_tests, #num_errors, #output_enqueue_thread_waiting, #retry, #rollback_count, #secondary, #write_count

Attributes included from PluginLoggerMixin

#log

Class Method Summary collapse

Instance Method Summary collapse

Methods included from PropagateDefault

included

Methods inherited from Plugin::Output

#acts_as_secondary, #after_shutdown, #after_start, #before_shutdown, #close, #commit_write, #emit_buffered, #emit_events, #emit_sync, #enqueue_thread_run, #enqueue_thread_wait, #execute_chunking, #flush_thread_run, #flush_thread_wakeup, #force_flush, #format, #handle_stream_simple, #handle_stream_with_custom_format, #handle_stream_with_standard_format, #implement?, #interrupt_flushes, #metadata, #next_flush_time, #prefer_buffered_processing, #prefer_delayed_commit, #process, #retry_state, #rollback_write, #shutdown, #stop, #submit_flush_all, #submit_flush_once, #terminate, #try_flush, #try_rollback_all, #try_rollback_write, #try_write, #write_guard

Methods included from UniqueId::Mixin

#dump_unique_id_hex, #generate_unique_id

Methods included from PluginHelper::Mixin

included

Methods included from PluginLoggerMixin

included, #terminate

Methods included from PluginId

#plugin_id, #plugin_id_configured?, #plugin_id_for_test?

Methods inherited from Plugin::Base

#after_shutdown, #after_shutdown?, #after_start, #after_started?, #before_shutdown, #before_shutdown?, #close, #closed?, #configured?, #has_router?, #inspect, #shutdown, #shutdown?, #started?, #stop, #stopped?, #terminate, #terminated?

Methods included from SystemConfig::Mixin

#system_config, #system_config_override

Methods included from Fluent::Configurable

#config, included, lookup_type, register_type

Constructor Details

#initializeObjectBufferedOutput

Returns a new instance of ObjectBufferedOutput.



502
503
504
505
506
507
# File 'lib/fluent/compat/output.rb', line 502

def initialize
  super
  unless self.class.ancestors.include?(Fluent::Compat::CallSuperMixin)
    self.class.prepend Fluent::Compat::CallSuperMixin
  end
end

Class Method Details

.propagate_default_paramsObject



449
450
451
# File 'lib/fluent/compat/output.rb', line 449

def self.propagate_default_params
  BUFFER_PARAMS
end

Instance Method Details

#configure(conf) ⇒ Object



454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
# File 'lib/fluent/compat/output.rb', line 454

def configure(conf)
  bufconf = CompatOutputUtils.buffer_section(conf)
  config_style = (bufconf ? :v1 : :v0)
  if config_style == :v0
    buf_params = {
      "flush_mode" => "interval",
      "retry_type" => "exponential_backoff",
    }
    BUFFER_PARAMS.each do |older, newer|
      next unless newer
      if conf.has_key?(older)
        if older == 'buffer_queue_full_action' && conf[older] == 'exception'
          buf_params[newer] = 'throw_exception'
        else
          buf_params[newer] = conf[older]
        end
      end
    end

    conf.elements << Fluent::Config::Element.new('buffer', 'tag', buf_params, [])
  end

  ParserUtils.convert_parser_conf(conf)
  FormatterUtils.convert_formatter_conf(conf)

  super

  if config_style == :v1
    if @buffer_config.chunk_keys == ['tag']
      raise Fluent::ConfigError, "this plugin '#{self.class}' allows <buffer tag> only"
    end
  end

  self.extend BufferedChunkMixin
end

#extract_placeholders(str, metadata) ⇒ Object



498
499
500
# File 'lib/fluent/compat/output.rb', line 498

def extract_placeholders(str, )
  raise "BUG: compat plugin does not support extract_placeholders: use newer plugin API"
end

#format_stream(tag, es) ⇒ Object

for BufferedOutputTestDriver



490
491
492
# File 'lib/fluent/compat/output.rb', line 490

def format_stream(tag, es) # for BufferedOutputTestDriver
  es.to_msgpack_stream(time_int: @time_as_integer)
end

#startObject



509
510
511
512
513
514
515
516
517
518
519
520
# File 'lib/fluent/compat/output.rb', line 509

def start
  super

  if instance_variable_defined?(:@formatter) && @inject_config
    unless @formatter.class.ancestors.include?(Fluent::Compat::HandleTagAndTimeMixin)
      if @formatter.respond_to?(:owner) && !@formatter.owner
        @formatter.owner = self
        @formatter.singleton_class.prepend FormatterUtils::InjectMixin
      end
    end
  end
end

#support_in_v12_style?(feature) ⇒ Boolean

This plugin cannot inherit BufferedOutput because #configure sets chunk_key ‘tag’ to flush chunks per tags, but BufferedOutput#configure doesn’t allow setting chunk_key in v1 style configuration

Returns:

  • (Boolean)


409
410
411
412
413
414
415
416
# File 'lib/fluent/compat/output.rb', line 409

def support_in_v12_style?(feature)
  case feature
  when :synchronous    then false
  when :buffered       then true
  when :delayed_commit then false
  when :custom_format  then false
  end
end

#write(chunk) ⇒ Object



494
495
496
# File 'lib/fluent/compat/output.rb', line 494

def write(chunk)
  write_objects(chunk..tag, chunk)
end