Class: Fluent::Compat::ObjectBufferedOutput
- Inherits:
-
Plugin::Output
- Object
- Plugin::Base
- Plugin::Output
- Fluent::Compat::ObjectBufferedOutput
- Includes:
- PropagateDefault
- Defined in:
- lib/fluent/compat/output.rb
Constant Summary collapse
- BUFFER_PARAMS =
Fluent::PluginHelper::CompatParameters::BUFFER_PARAMS
Constants inherited from Plugin::Output
Plugin::Output::CHUNKING_FIELD_WARN_NUM, Plugin::Output::CHUNK_KEY_PATTERN, Plugin::Output::CHUNK_KEY_PLACEHOLDER_PATTERN, Plugin::Output::FORMAT_MSGPACK_STREAM, Plugin::Output::FORMAT_MSGPACK_STREAM_TIME_INT
Constants included from Fluent::Configurable
Fluent::Configurable::CONFIG_TYPE_REGISTRY
Instance Attribute Summary
Attributes inherited from Plugin::Output
#as_secondary, #buffer, #chunk_key_tag, #chunk_key_time, #chunk_keys, #delayed_commit, #delayed_commit_timeout, #emit_count, #emit_records, #in_tests, #num_errors, #output_enqueue_thread_waiting, #retry, #rollback_count, #secondary, #write_count
Attributes included from PluginLoggerMixin
Class Method Summary collapse
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #extract_placeholders(str, metadata) ⇒ Object
-
#format_stream(tag, es) ⇒ Object
for BufferedOutputTestDriver.
-
#initialize ⇒ ObjectBufferedOutput
constructor
A new instance of ObjectBufferedOutput.
- #start ⇒ Object
-
#support_in_v12_style?(feature) ⇒ Boolean
This plugin cannot inherit BufferedOutput because #configure sets chunk_key ‘tag’ to flush chunks per tags, but BufferedOutput#configure doesn’t allow setting chunk_key in v1 style configuration.
- #write(chunk) ⇒ Object
Methods included from PropagateDefault
Methods inherited from Plugin::Output
#acts_as_secondary, #after_shutdown, #after_start, #before_shutdown, #close, #commit_write, #emit_buffered, #emit_events, #emit_sync, #enqueue_thread_run, #enqueue_thread_wait, #execute_chunking, #flush_thread_run, #flush_thread_wakeup, #force_flush, #format, #handle_stream_simple, #handle_stream_with_custom_format, #handle_stream_with_standard_format, #implement?, #interrupt_flushes, #metadata, #next_flush_time, #prefer_buffered_processing, #prefer_delayed_commit, #process, #retry_state, #rollback_write, #shutdown, #stop, #submit_flush_all, #submit_flush_once, #terminate, #try_flush, #try_rollback_all, #try_rollback_write, #try_write, #write_guard
Methods included from UniqueId::Mixin
#dump_unique_id_hex, #generate_unique_id
Methods included from PluginHelper::Mixin
Methods included from PluginLoggerMixin
Methods included from PluginId
#plugin_id, #plugin_id_configured?, #plugin_id_for_test?
Methods inherited from Plugin::Base
#after_shutdown, #after_shutdown?, #after_start, #after_started?, #before_shutdown, #before_shutdown?, #close, #closed?, #configured?, #has_router?, #inspect, #shutdown, #shutdown?, #started?, #stop, #stopped?, #terminate, #terminated?
Methods included from SystemConfig::Mixin
#system_config, #system_config_override
Methods included from Fluent::Configurable
#config, included, lookup_type, register_type
Constructor Details
#initialize ⇒ ObjectBufferedOutput
Returns a new instance of ObjectBufferedOutput.
502 503 504 505 506 507 |
# File 'lib/fluent/compat/output.rb', line 502 def initialize super unless self.class.ancestors.include?(Fluent::Compat::CallSuperMixin) self.class.prepend Fluent::Compat::CallSuperMixin end end |
Class Method Details
.propagate_default_params ⇒ Object
449 450 451 |
# File 'lib/fluent/compat/output.rb', line 449 def self.propagate_default_params BUFFER_PARAMS end |
Instance Method Details
#configure(conf) ⇒ Object
454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 |
# File 'lib/fluent/compat/output.rb', line 454 def configure(conf) bufconf = CompatOutputUtils.buffer_section(conf) config_style = (bufconf ? :v1 : :v0) if config_style == :v0 buf_params = { "flush_mode" => "interval", "retry_type" => "exponential_backoff", } BUFFER_PARAMS.each do |older, newer| next unless newer if conf.has_key?(older) if older == 'buffer_queue_full_action' && conf[older] == 'exception' buf_params[newer] = 'throw_exception' else buf_params[newer] = conf[older] end end end conf.elements << Fluent::Config::Element.new('buffer', 'tag', buf_params, []) end ParserUtils.convert_parser_conf(conf) FormatterUtils.convert_formatter_conf(conf) super if config_style == :v1 if @buffer_config.chunk_keys == ['tag'] raise Fluent::ConfigError, "this plugin '#{self.class}' allows <buffer tag> only" end end self.extend BufferedChunkMixin end |
#extract_placeholders(str, metadata) ⇒ Object
498 499 500 |
# File 'lib/fluent/compat/output.rb', line 498 def extract_placeholders(str, ) raise "BUG: compat plugin does not support extract_placeholders: use newer plugin API" end |
#format_stream(tag, es) ⇒ Object
for BufferedOutputTestDriver
490 491 492 |
# File 'lib/fluent/compat/output.rb', line 490 def format_stream(tag, es) # for BufferedOutputTestDriver es.to_msgpack_stream(time_int: @time_as_integer) end |
#start ⇒ Object
509 510 511 512 513 514 515 516 517 518 519 520 |
# File 'lib/fluent/compat/output.rb', line 509 def start super if instance_variable_defined?(:@formatter) && @inject_config unless @formatter.class.ancestors.include?(Fluent::Compat::HandleTagAndTimeMixin) if @formatter.respond_to?(:owner) && !@formatter.owner @formatter.owner = self @formatter.singleton_class.prepend FormatterUtils::InjectMixin end end end end |
#support_in_v12_style?(feature) ⇒ Boolean
This plugin cannot inherit BufferedOutput because #configure sets chunk_key ‘tag’ to flush chunks per tags, but BufferedOutput#configure doesn’t allow setting chunk_key in v1 style configuration
409 410 411 412 413 414 415 416 |
# File 'lib/fluent/compat/output.rb', line 409 def support_in_v12_style?(feature) case feature when :synchronous then false when :buffered then true when :delayed_commit then false when :custom_format then false end end |
#write(chunk) ⇒ Object
494 495 496 |
# File 'lib/fluent/compat/output.rb', line 494 def write(chunk) write_objects(chunk..tag, chunk) end |