Class: Fluent::Compat::TimeSlicedOutput
- Inherits:
-
Plugin::Output
- Object
- Plugin::Base
- Plugin::Output
- Fluent::Compat::TimeSlicedOutput
- Includes:
- PropagateDefault
- Defined in:
- lib/fluent/compat/output.rb
Constant Summary collapse
- BUFFER_PARAMS =
Fluent::PluginHelper::CompatParameters::BUFFER_PARAMS.merge(Fluent::PluginHelper::CompatParameters::BUFFER_TIME_SLICED_PARAMS)
Constants inherited from Plugin::Output
Plugin::Output::CHUNKING_FIELD_WARN_NUM, Plugin::Output::CHUNK_KEY_PATTERN, Plugin::Output::CHUNK_KEY_PLACEHOLDER_PATTERN, Plugin::Output::CHUNK_TAG_PLACEHOLDER_PATTERN, Plugin::Output::FORMAT_COMPRESSED_MSGPACK_STREAM, Plugin::Output::FORMAT_COMPRESSED_MSGPACK_STREAM_TIME_INT, Plugin::Output::FORMAT_MSGPACK_STREAM, Plugin::Output::FORMAT_MSGPACK_STREAM_TIME_INT, Plugin::Output::TIMESTAMP_CHECK_BASE_TIME, Plugin::Output::TIME_KEY_PLACEHOLDER_THRESHOLDS
Constants included from Fluent::Configurable
Fluent::Configurable::CONFIG_TYPE_REGISTRY
Instance Attribute Summary collapse
-
#localtime ⇒ Object
Returns the value of attribute localtime.
Attributes inherited from Plugin::Output
#as_secondary, #buffer, #chunk_key_tag, #chunk_key_time, #chunk_keys, #delayed_commit, #delayed_commit_timeout, #dequeued_chunks, #dequeued_chunks_mutex, #emit_count, #emit_records, #num_errors, #output_enqueue_thread_waiting, #retry, #retry_for_error_chunk, #rollback_count, #secondary, #timekey_zone, #write_count
Attributes included from PluginLoggerMixin
Attributes inherited from Plugin::Base
Class Method Summary collapse
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #detach_multi_process(&block) ⇒ Object
- #detach_process(&block) ⇒ Object
-
#extract_placeholders(str, metadata) ⇒ Object
#format MUST be implemented in plugin #write is also.
-
#initialize ⇒ TimeSlicedOutput
constructor
A new instance of TimeSlicedOutput.
- #start ⇒ Object
- #support_in_v12_style?(feature) ⇒ Boolean
Methods included from PropagateDefault
Methods inherited from Plugin::Output
#acts_as_secondary, #after_shutdown, #after_start, #before_shutdown, #check_slow_flush, #close, #commit_write, #emit_buffered, #emit_events, #emit_sync, #enqueue_thread_run, #enqueue_thread_wait, #execute_chunking, #flush_thread_run, #flush_thread_wakeup, #force_flush, #format, #formatted_to_msgpack_binary, #generate_format_proc, #get_placeholders_keys, #get_placeholders_tag, #get_placeholders_time, #handle_stream_simple, #handle_stream_with_custom_format, #handle_stream_with_standard_format, #implement?, #interrupt_flushes, #metadata, #metadata_for_test, #multi_workers_ready?, #next_flush_time, #placeholder_validate!, #placeholder_validators, #prefer_buffered_processing, #prefer_delayed_commit, #process, #retry_state, #rollback_write, #shutdown, #stop, #submit_flush_all, #submit_flush_once, #terminate, #try_flush, #try_rollback_all, #try_rollback_write, #try_write, #update_retry_state, #write, #write_guard
Methods included from UniqueId::Mixin
#dump_unique_id_hex, #generate_unique_id
Methods included from PluginHelper::Mixin
Methods included from PluginLoggerMixin
Methods included from PluginId
#plugin_id, #plugin_id_configured?, #plugin_id_for_test?, #plugin_root_dir
Methods inherited from Plugin::Base
#after_shutdown, #after_shutdown?, #after_start, #after_started?, #before_shutdown, #before_shutdown?, #close, #closed?, #configured?, #context_router, #context_router=, #fluentd_worker_id, #has_router?, #inspect, #multi_workers_ready?, #plugin_root_dir, #shutdown, #shutdown?, #started?, #stop, #stopped?, #string_safe_encoding, #terminate, #terminated?
Methods included from SystemConfig::Mixin
#system_config, #system_config_override
Methods included from Fluent::Configurable
#config, #configure_proxy_generate, #configured_section_create, included, lookup_type, register_type
Constructor Details
#initialize ⇒ TimeSlicedOutput
Returns a new instance of TimeSlicedOutput.
618 619 620 621 622 623 624 625 |
# File 'lib/fluent/compat/output.rb', line 618 def initialize super @localtime = true unless self.class.ancestors.include?(Fluent::Compat::CallSuperMixin) self.class.prepend Fluent::Compat::CallSuperMixin end end |
Instance Attribute Details
#localtime ⇒ Object
Returns the value of attribute localtime.
610 611 612 |
# File 'lib/fluent/compat/output.rb', line 610 def localtime @localtime end |
Class Method Details
.propagate_default_params ⇒ Object
627 628 629 |
# File 'lib/fluent/compat/output.rb', line 627 def self.propagate_default_params BUFFER_PARAMS end |
Instance Method Details
#configure(conf) ⇒ Object
632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 |
# File 'lib/fluent/compat/output.rb', line 632 def configure(conf) bufconf = CompatOutputUtils.buffer_section(conf) config_style = (bufconf ? :v1 : :v0) if config_style == :v0 buf_params = { "flush_mode" => (conf['flush_interval'] ? "interval" : "lazy"), "retry_type" => "exponential_backoff", } BUFFER_PARAMS.each do |older, newer| next unless newer if conf.has_key?(older) if older == 'buffer_queue_full_action' && conf[older] == 'exception' buf_params[newer] = 'throw_exception' else buf_params[newer] = conf[older] end end end if conf['timezone'] Fluent::Timezone.validate!(conf['timezone']) elsif conf['utc'] # v0.12 assumes UTC without any configuration # 'localtime=false && no timezone key' means UTC conf['localtime'] = "false" conf.delete('utc') elsif conf['localtime'] conf['timezone'] = Time.now.strftime('%z') conf['localtime'] = "true" else # v0.12 assumes UTC without any configuration # 'localtime=false && no timezone key' means UTC conf['localtime'] = "false" end @_timekey = case conf['time_slice_format'] when /\%S/ then 1 when /\%M/ then 60 when /\%H/ then 3600 when /\%d/ then 86400 when nil then 86400 # default value of TimeSlicedOutput.time_slice_format is '%Y%m%d' else raise Fluent::ConfigError, "time_slice_format only with %Y or %m is too long" end buf_params["timekey"] = @_timekey conf.elements << Fluent::Config::Element.new('buffer', 'time', buf_params, []) end ParserUtils.convert_parser_conf(conf) FormatterUtils.convert_formatter_conf(conf) super if config_style == :v1 if @buffer_config.chunk_keys == ['tag'] raise Fluent::ConfigError, "this plugin '#{self.class}' allows <buffer tag> only" end end self.extend TimeSliceChunkMixin end |
#detach_multi_process(&block) ⇒ Object
713 714 715 716 |
# File 'lib/fluent/compat/output.rb', line 713 def detach_multi_process(&block) log.warn "detach_process is not supported in this version. ignored." block.call end |
#detach_process(&block) ⇒ Object
708 709 710 711 |
# File 'lib/fluent/compat/output.rb', line 708 def detach_process(&block) log.warn "detach_process is not supported in this version. ignored." block.call end |
#extract_placeholders(str, metadata) ⇒ Object
#format MUST be implemented in plugin #write is also
723 724 725 |
# File 'lib/fluent/compat/output.rb', line 723 def extract_placeholders(str, ) raise "BUG: compat plugin does not support extract_placeholders: use newer plugin API" end |
#start ⇒ Object
695 696 697 698 699 700 701 702 703 704 705 706 |
# File 'lib/fluent/compat/output.rb', line 695 def start super if instance_variable_defined?(:@formatter) && @inject_config unless @formatter.class.ancestors.include?(Fluent::Compat::HandleTagAndTimeMixin) if @formatter.respond_to?(:owner) && !@formatter.owner @formatter.owner = self @formatter.singleton_class.prepend FormatterUtils::InjectMixin end end end end |
#support_in_v12_style?(feature) ⇒ Boolean
567 568 569 570 571 572 573 574 |
# File 'lib/fluent/compat/output.rb', line 567 def support_in_v12_style?(feature) case feature when :synchronous then false when :buffered then true when :delayed_commit then false when :custom_format then true end end |