Class: Fluent::Compat::TimeSlicedOutput

Inherits:
Plugin::Output show all
Includes:
PropagateDefault
Defined in:
lib/fluent/compat/output.rb

Constant Summary collapse

BUFFER_PARAMS =
Fluent::PluginHelper::CompatParameters::BUFFER_PARAMS.merge(Fluent::PluginHelper::CompatParameters::BUFFER_TIME_SLICED_PARAMS)

Constants inherited from Plugin::Output

Plugin::Output::CHUNKING_FIELD_WARN_NUM, Plugin::Output::CHUNK_KEY_PATTERN, Plugin::Output::CHUNK_KEY_PLACEHOLDER_PATTERN, Plugin::Output::CHUNK_TAG_PLACEHOLDER_PATTERN, Plugin::Output::FORMAT_COMPRESSED_MSGPACK_STREAM, Plugin::Output::FORMAT_COMPRESSED_MSGPACK_STREAM_TIME_INT, Plugin::Output::FORMAT_MSGPACK_STREAM, Plugin::Output::FORMAT_MSGPACK_STREAM_TIME_INT, Plugin::Output::TIMESTAMP_CHECK_BASE_TIME, Plugin::Output::TIME_KEY_PLACEHOLDER_THRESHOLDS

Constants included from Fluent::Configurable

Fluent::Configurable::CONFIG_TYPE_REGISTRY

Instance Attribute Summary collapse

Attributes inherited from Plugin::Output

#as_secondary, #buffer, #chunk_key_tag, #chunk_key_time, #chunk_keys, #delayed_commit, #delayed_commit_timeout, #dequeued_chunks, #dequeued_chunks_mutex, #emit_count, #emit_records, #num_errors, #output_enqueue_thread_waiting, #retry, #retry_for_error_chunk, #rollback_count, #secondary, #timekey_zone, #write_count

Attributes included from PluginLoggerMixin

#log

Attributes inherited from Plugin::Base

#under_plugin_development

Class Method Summary collapse

Instance Method Summary collapse

Methods included from PropagateDefault

included

Methods inherited from Plugin::Output

#acts_as_secondary, #after_shutdown, #after_start, #before_shutdown, #check_slow_flush, #close, #commit_write, #emit_buffered, #emit_events, #emit_sync, #enqueue_thread_run, #enqueue_thread_wait, #execute_chunking, #flush_thread_run, #flush_thread_wakeup, #force_flush, #format, #formatted_to_msgpack_binary, #generate_format_proc, #get_placeholders_keys, #get_placeholders_tag, #get_placeholders_time, #handle_stream_simple, #handle_stream_with_custom_format, #handle_stream_with_standard_format, #implement?, #interrupt_flushes, #metadata, #metadata_for_test, #multi_workers_ready?, #next_flush_time, #placeholder_validate!, #placeholder_validators, #prefer_buffered_processing, #prefer_delayed_commit, #process, #retry_state, #rollback_write, #shutdown, #stop, #submit_flush_all, #submit_flush_once, #terminate, #try_flush, #try_rollback_all, #try_rollback_write, #try_write, #update_retry_state, #write, #write_guard

Methods included from UniqueId::Mixin

#dump_unique_id_hex, #generate_unique_id

Methods included from PluginHelper::Mixin

included

Methods included from PluginLoggerMixin

included, #terminate

Methods included from PluginId

#plugin_id, #plugin_id_configured?, #plugin_id_for_test?, #plugin_root_dir

Methods inherited from Plugin::Base

#after_shutdown, #after_shutdown?, #after_start, #after_started?, #before_shutdown, #before_shutdown?, #close, #closed?, #configured?, #context_router, #context_router=, #fluentd_worker_id, #has_router?, #inspect, #multi_workers_ready?, #plugin_root_dir, #shutdown, #shutdown?, #started?, #stop, #stopped?, #string_safe_encoding, #terminate, #terminated?

Methods included from SystemConfig::Mixin

#system_config, #system_config_override

Methods included from Fluent::Configurable

#config, #configure_proxy_generate, #configured_section_create, included, lookup_type, register_type

Constructor Details

#initializeTimeSlicedOutput

Returns a new instance of TimeSlicedOutput.



618
619
620
621
622
623
624
625
# File 'lib/fluent/compat/output.rb', line 618

def initialize
  super
  @localtime = true

  unless self.class.ancestors.include?(Fluent::Compat::CallSuperMixin)
    self.class.prepend Fluent::Compat::CallSuperMixin
  end
end

Instance Attribute Details

#localtimeObject

Returns the value of attribute localtime.



610
611
612
# File 'lib/fluent/compat/output.rb', line 610

def localtime
  @localtime
end

Class Method Details

.propagate_default_paramsObject



627
628
629
# File 'lib/fluent/compat/output.rb', line 627

def self.propagate_default_params
  BUFFER_PARAMS
end

Instance Method Details

#configure(conf) ⇒ Object



632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
# File 'lib/fluent/compat/output.rb', line 632

def configure(conf)
  bufconf = CompatOutputUtils.buffer_section(conf)
  config_style = (bufconf ? :v1 : :v0)
  if config_style == :v0
    buf_params = {
      "flush_mode" => (conf['flush_interval'] ? "interval" : "lazy"),
      "retry_type" => "exponential_backoff",
    }
    BUFFER_PARAMS.each do |older, newer|
      next unless newer
      if conf.has_key?(older)
        if older == 'buffer_queue_full_action' && conf[older] == 'exception'
          buf_params[newer] = 'throw_exception'
        else
          buf_params[newer] = conf[older]
        end
      end
    end

    if conf['timezone']
      Fluent::Timezone.validate!(conf['timezone'])
    elsif conf['utc']
      # v0.12 assumes UTC without any configuration
      # 'localtime=false && no timezone key' means UTC
      conf['localtime'] = "false"
      conf.delete('utc')
    elsif conf['localtime']
      conf['timezone'] = Time.now.strftime('%z')
      conf['localtime'] = "true"
    else
      # v0.12 assumes UTC without any configuration
      # 'localtime=false && no timezone key' means UTC
      conf['localtime'] = "false"
    end

    @_timekey = case conf['time_slice_format']
                when /\%S/ then 1
                when /\%M/ then 60
                when /\%H/ then 3600
                when /\%d/ then 86400
                when nil   then 86400 # default value of TimeSlicedOutput.time_slice_format is '%Y%m%d'
                else
                  raise Fluent::ConfigError, "time_slice_format only with %Y or %m is too long"
                end
    buf_params["timekey"] = @_timekey

    conf.elements << Fluent::Config::Element.new('buffer', 'time', buf_params, [])
  end

  ParserUtils.convert_parser_conf(conf)
  FormatterUtils.convert_formatter_conf(conf)

  super

  if config_style == :v1
    if @buffer_config.chunk_keys == ['tag']
      raise Fluent::ConfigError, "this plugin '#{self.class}' allows <buffer tag> only"
    end
  end

  self.extend TimeSliceChunkMixin
end

#detach_multi_process(&block) ⇒ Object



713
714
715
716
# File 'lib/fluent/compat/output.rb', line 713

def detach_multi_process(&block)
  log.warn "detach_process is not supported in this version. ignored."
  block.call
end

#detach_process(&block) ⇒ Object



708
709
710
711
# File 'lib/fluent/compat/output.rb', line 708

def detach_process(&block)
  log.warn "detach_process is not supported in this version. ignored."
  block.call
end

#extract_placeholders(str, metadata) ⇒ Object

#format MUST be implemented in plugin #write is also



723
724
725
# File 'lib/fluent/compat/output.rb', line 723

def extract_placeholders(str, )
  raise "BUG: compat plugin does not support extract_placeholders: use newer plugin API"
end

#startObject



695
696
697
698
699
700
701
702
703
704
705
706
# File 'lib/fluent/compat/output.rb', line 695

def start
  super

  if instance_variable_defined?(:@formatter) && @inject_config
    unless @formatter.class.ancestors.include?(Fluent::Compat::HandleTagAndTimeMixin)
      if @formatter.respond_to?(:owner) && !@formatter.owner
        @formatter.owner = self
        @formatter.singleton_class.prepend FormatterUtils::InjectMixin
      end
    end
  end
end

#support_in_v12_style?(feature) ⇒ Boolean

Returns:

  • (Boolean)


567
568
569
570
571
572
573
574
# File 'lib/fluent/compat/output.rb', line 567

def support_in_v12_style?(feature)
  case feature
  when :synchronous    then false
  when :buffered       then true
  when :delayed_commit then false
  when :custom_format  then true
  end
end