Class: Fluent::Compat::TimeSlicedOutput

Inherits:
Plugin::Output show all
Includes:
PropagateDefault
Defined in:
lib/fluent/compat/output.rb

Constant Summary collapse

BUFFER_PARAMS =
Fluent::PluginHelper::CompatParameters::BUFFER_PARAMS.merge(Fluent::PluginHelper::CompatParameters::BUFFER_TIME_SLICED_PARAMS)

Constants inherited from Plugin::Output

Plugin::Output::CHUNKING_FIELD_WARN_NUM, Plugin::Output::CHUNK_KEY_PATTERN, Plugin::Output::CHUNK_KEY_PLACEHOLDER_PATTERN, Plugin::Output::CHUNK_TAG_PLACEHOLDER_PATTERN, Plugin::Output::FORMAT_COMPRESSED_MSGPACK_STREAM, Plugin::Output::FORMAT_COMPRESSED_MSGPACK_STREAM_TIME_INT, Plugin::Output::FORMAT_MSGPACK_STREAM, Plugin::Output::FORMAT_MSGPACK_STREAM_TIME_INT, Plugin::Output::TIMESTAMP_CHECK_BASE_TIME, Plugin::Output::TIME_KEY_PLACEHOLDER_THRESHOLDS

Constants included from Fluent::Configurable

Fluent::Configurable::CONFIG_TYPE_REGISTRY

Instance Attribute Summary collapse

Attributes inherited from Plugin::Output

#as_secondary, #buffer, #chunk_key_tag, #chunk_key_time, #chunk_keys, #delayed_commit, #delayed_commit_timeout, #emit_count, #emit_records, #in_tests, #num_errors, #output_enqueue_thread_waiting, #retry, #rollback_count, #secondary, #timekey_zone, #write_count

Attributes included from PluginLoggerMixin

#log

Attributes inherited from Plugin::Base

#under_plugin_development

Class Method Summary collapse

Instance Method Summary collapse

Methods included from PropagateDefault

included

Methods inherited from Plugin::Output

#acts_as_secondary, #after_shutdown, #after_start, #before_shutdown, #close, #commit_write, #emit_buffered, #emit_events, #emit_sync, #enqueue_thread_run, #enqueue_thread_wait, #execute_chunking, #flush_thread_run, #flush_thread_wakeup, #force_flush, #format, #formatted_to_msgpack_binary, #generate_format_proc, #get_placeholders_keys, #get_placeholders_tag, #get_placeholders_time, #handle_stream_simple, #handle_stream_with_custom_format, #handle_stream_with_standard_format, #implement?, #interrupt_flushes, #metadata, #metadata_for_test, #next_flush_time, #placeholder_validate!, #placeholder_validators, #prefer_buffered_processing, #prefer_delayed_commit, #process, #retry_state, #rollback_write, #shutdown, #stop, #submit_flush_all, #submit_flush_once, #terminate, #try_flush, #try_rollback_all, #try_rollback_write, #try_write, #write, #write_guard

Methods included from UniqueId::Mixin

#dump_unique_id_hex, #generate_unique_id

Methods included from PluginHelper::Mixin

included

Methods included from PluginLoggerMixin

included, #terminate

Methods included from PluginId

#plugin_id, #plugin_id_configured?, #plugin_id_for_test?

Methods inherited from Plugin::Base

#after_shutdown, #after_shutdown?, #after_start, #after_started?, #before_shutdown, #before_shutdown?, #close, #closed?, #configured?, #has_router?, #inspect, #shutdown, #shutdown?, #started?, #stop, #stopped?, #terminate, #terminated?

Methods included from SystemConfig::Mixin

#system_config, #system_config_override

Methods included from Fluent::Configurable

#config, included, lookup_type, register_type

Constructor Details

#initializeTimeSlicedOutput

Returns a new instance of TimeSlicedOutput.



586
587
588
589
590
591
592
593
# File 'lib/fluent/compat/output.rb', line 586

def initialize
  super
  @localtime = true

  unless self.class.ancestors.include?(Fluent::Compat::CallSuperMixin)
    self.class.prepend Fluent::Compat::CallSuperMixin
  end
end

Instance Attribute Details

#localtimeObject

Returns the value of attribute localtime.



578
579
580
# File 'lib/fluent/compat/output.rb', line 578

def localtime
  @localtime
end

Class Method Details

.propagate_default_paramsObject



595
596
597
# File 'lib/fluent/compat/output.rb', line 595

def self.propagate_default_params
  BUFFER_PARAMS
end

Instance Method Details

#configure(conf) ⇒ Object



600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
# File 'lib/fluent/compat/output.rb', line 600

def configure(conf)
  bufconf = CompatOutputUtils.buffer_section(conf)
  config_style = (bufconf ? :v1 : :v0)
  if config_style == :v0
    buf_params = {
      "flush_mode" => (conf['flush_interval'] ? "interval" : "lazy"),
      "retry_type" => "exponential_backoff",
    }
    BUFFER_PARAMS.each do |older, newer|
      next unless newer
      if conf.has_key?(older)
        if older == 'buffer_queue_full_action' && conf[older] == 'exception'
          buf_params[newer] = 'throw_exception'
        else
          buf_params[newer] = conf[older]
        end
      end
    end

    if conf['timezone']
      @timezone = conf['timezone']
      Fluent::Timezone.validate!(@timezone)
    elsif conf['utc']
      @timezone = "+0000"
      @localtime = false
    elsif conf['localtime']
      @timezone = Time.now.strftime('%z')
      @localtime = true
    else
      @timezone = "+0000" # v0.12 assumes UTC without any configuration
      @localtime = false
    end

    @_timekey = case conf['time_slice_format']
                when /\%S/ then 1
                when /\%M/ then 60
                when /\%H/ then 3600
                when /\%d/ then 86400
                when nil   then 86400 # default value of TimeSlicedOutput.time_slice_format is '%Y%m%d'
                else
                  raise Fluent::ConfigError, "time_slice_format only with %Y or %m is too long"
                end
    buf_params["timekey"] = @_timekey

    conf.elements << Fluent::Config::Element.new('buffer', 'time', buf_params, [])
  end

  ParserUtils.convert_parser_conf(conf)
  FormatterUtils.convert_formatter_conf(conf)

  super

  if config_style == :v1
    if @buffer_config.chunk_keys == ['tag']
      raise Fluent::ConfigError, "this plugin '#{self.class}' allows <buffer tag> only"
    end
  end

  self.extend TimeSliceChunkMixin
end

#extract_placeholders(str, metadata) ⇒ Object

#format MUST be implemented in plugin #write is also



679
680
681
# File 'lib/fluent/compat/output.rb', line 679

def extract_placeholders(str, )
  raise "BUG: compat plugin does not support extract_placeholders: use newer plugin API"
end

#startObject



661
662
663
664
665
666
667
668
669
670
671
672
# File 'lib/fluent/compat/output.rb', line 661

def start
  super

  if instance_variable_defined?(:@formatter) && @inject_config
    unless @formatter.class.ancestors.include?(Fluent::Compat::HandleTagAndTimeMixin)
      if @formatter.respond_to?(:owner) && !@formatter.owner
        @formatter.owner = self
        @formatter.singleton_class.prepend FormatterUtils::InjectMixin
      end
    end
  end
end

#support_in_v12_style?(feature) ⇒ Boolean

Returns:

  • (Boolean)


535
536
537
538
539
540
541
542
# File 'lib/fluent/compat/output.rb', line 535

def support_in_v12_style?(feature)
  case feature
  when :synchronous    then false
  when :buffered       then true
  when :delayed_commit then false
  when :custom_format  then true
  end
end