Class: Fluent::Compat::BufferedOutput
- Inherits:
-
Plugin::Output
- Object
- Plugin::Base
- Plugin::Output
- Fluent::Compat::BufferedOutput
- Includes:
- PropagateDefault
- Defined in:
- lib/fluent/compat/output.rb
Constant Summary collapse
- BUFFER_PARAMS =
Fluent::PluginHelper::CompatParameters::BUFFER_PARAMS
Constants inherited from Plugin::Output
Plugin::Output::CHUNKING_FIELD_WARN_NUM, Plugin::Output::CHUNK_KEY_PATTERN, Plugin::Output::CHUNK_KEY_PLACEHOLDER_PATTERN, Plugin::Output::CHUNK_TAG_PLACEHOLDER_PATTERN, Plugin::Output::FORMAT_COMPRESSED_MSGPACK_STREAM, Plugin::Output::FORMAT_COMPRESSED_MSGPACK_STREAM_TIME_INT, Plugin::Output::FORMAT_MSGPACK_STREAM, Plugin::Output::FORMAT_MSGPACK_STREAM_TIME_INT, Plugin::Output::TIMESTAMP_CHECK_BASE_TIME, Plugin::Output::TIME_KEY_PLACEHOLDER_THRESHOLDS
Constants included from Fluent::Configurable
Fluent::Configurable::CONFIG_TYPE_REGISTRY
Instance Attribute Summary
Attributes inherited from Plugin::Output
#as_secondary, #buffer, #chunk_key_tag, #chunk_key_time, #chunk_keys, #delayed_commit, #delayed_commit_timeout, #emit_count, #emit_records, #in_tests, #num_errors, #output_enqueue_thread_waiting, #retry, #rollback_count, #secondary, #timekey_zone, #write_count
Attributes included from PluginLoggerMixin
Attributes inherited from Plugin::Base
Class Method Summary collapse
Instance Method Summary collapse
- #configure(conf) ⇒ Object
-
#emit(tag, es, chain, key = "") ⇒ Object
original implementation of v0.12 BufferedOutput.
- #extract_placeholders(str, metadata) ⇒ Object
- #format_stream(tag, es) ⇒ Object
-
#handle_stream_simple(tag, es, enqueue: false) ⇒ Object
This method overrides Fluent::Plugin::Output#handle_stream_simple because v0.12 BufferedOutput may overrides #format_stream, but original #handle_stream_simple method doesn’t consider about it.
-
#initialize ⇒ BufferedOutput
constructor
A new instance of BufferedOutput.
- #start ⇒ Object
- #submit_flush ⇒ Object
- #support_in_v12_style?(feature) ⇒ Boolean
Methods included from PropagateDefault
Methods inherited from Plugin::Output
#acts_as_secondary, #after_shutdown, #after_start, #before_shutdown, #close, #commit_write, #emit_buffered, #emit_events, #emit_sync, #enqueue_thread_run, #enqueue_thread_wait, #execute_chunking, #flush_thread_run, #flush_thread_wakeup, #force_flush, #format, #formatted_to_msgpack_binary, #generate_format_proc, #get_placeholders_keys, #get_placeholders_tag, #get_placeholders_time, #handle_stream_with_custom_format, #handle_stream_with_standard_format, #implement?, #interrupt_flushes, #metadata, #metadata_for_test, #next_flush_time, #placeholder_validate!, #placeholder_validators, #prefer_buffered_processing, #prefer_delayed_commit, #process, #retry_state, #rollback_write, #shutdown, #stop, #submit_flush_all, #submit_flush_once, #terminate, #try_flush, #try_rollback_all, #try_rollback_write, #try_write, #write, #write_guard
Methods included from UniqueId::Mixin
#dump_unique_id_hex, #generate_unique_id
Methods included from PluginHelper::Mixin
Methods included from PluginLoggerMixin
Methods included from PluginId
#plugin_id, #plugin_id_configured?, #plugin_id_for_test?
Methods inherited from Plugin::Base
#after_shutdown, #after_shutdown?, #after_start, #after_started?, #before_shutdown, #before_shutdown?, #close, #closed?, #configured?, #has_router?, #inspect, #shutdown, #shutdown?, #started?, #stop, #stopped?, #terminate, #terminated?
Methods included from SystemConfig::Mixin
#system_config, #system_config_override
Methods included from Fluent::Configurable
#config, included, lookup_type, register_type
Constructor Details
#initialize ⇒ BufferedOutput
Returns a new instance of BufferedOutput.
382 383 384 385 386 387 |
# File 'lib/fluent/compat/output.rb', line 382 def initialize super unless self.class.ancestors.include?(Fluent::Compat::CallSuperMixin) self.class.prepend Fluent::Compat::CallSuperMixin end end |
Class Method Details
.propagate_default_params ⇒ Object
246 247 248 |
# File 'lib/fluent/compat/output.rb', line 246 def self.propagate_default_params BUFFER_PARAMS end |
Instance Method Details
#configure(conf) ⇒ Object
251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 |
# File 'lib/fluent/compat/output.rb', line 251 def configure(conf) bufconf = CompatOutputUtils.buffer_section(conf) config_style = (bufconf ? :v1 : :v0) if config_style == :v0 buf_params = { "flush_mode" => "interval", "retry_type" => "exponential_backoff", } BUFFER_PARAMS.each do |older, newer| next unless newer if conf.has_key?(older) if older == 'buffer_queue_full_action' && conf[older] == 'exception' buf_params[newer] = 'throw_exception' else buf_params[newer] = conf[older] end end end conf.elements << Fluent::Config::Element.new('buffer', '', buf_params, []) end @includes_record_filter = self.class.ancestors.include?(Fluent::Compat::RecordFilterMixin) methods_of_plugin = self.class.instance_methods(false) @overrides_emit = methods_of_plugin.include?(:emit) # RecordFilter mixin uses its own #format_stream method implementation @overrides_format_stream = methods_of_plugin.include?(:format_stream) || @includes_record_filter ParserUtils.convert_parser_conf(conf) FormatterUtils.convert_formatter_conf(conf) super if config_style == :v1 unless @buffer_config.chunk_keys.empty? raise Fluent::ConfigError, "this plugin '#{self.class}' cannot handle arguments for <buffer ...> section" end end self.extend BufferedChunkMixin if @overrides_emit self.singleton_class.module_eval do attr_accessor :last_emit_via_buffer end output_plugin = self m = Module.new do define_method(:emit) do |key, data, chain| # receivers of this method are buffer instances output_plugin.last_emit_via_buffer = [key, data] end end @buffer.extend m end end |
#emit(tag, es, chain, key = "") ⇒ Object
original implementation of v0.12 BufferedOutput
309 310 311 312 313 314 315 316 |
# File 'lib/fluent/compat/output.rb', line 309 def emit(tag, es, chain, key="") # this method will not be used except for the case that plugin calls super @emit_count += 1 data = format_stream(tag, es) if @buffer.emit(key, data, chain) submit_flush end end |
#extract_placeholders(str, metadata) ⇒ Object
378 379 380 |
# File 'lib/fluent/compat/output.rb', line 378 def extract_placeholders(str, ) raise "BUG: compat plugin does not support extract_placeholders: use newer plugin API" end |
#format_stream(tag, es) ⇒ Object
322 323 324 325 326 327 328 329 |
# File 'lib/fluent/compat/output.rb', line 322 def format_stream(tag, es) # this method will not be used except for the case that plugin calls super out = '' es.each do |time, record| out << format(tag, time, record) end out end |
#handle_stream_simple(tag, es, enqueue: false) ⇒ Object
This method overrides Fluent::Plugin::Output#handle_stream_simple because v0.12 BufferedOutput may overrides #format_stream, but original #handle_stream_simple method doesn’t consider about it
336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 |
# File 'lib/fluent/compat/output.rb', line 336 def handle_stream_simple(tag, es, enqueue: false) if @overrides_emit current_emit_count = @emit_count size = es.size key = data = nil begin emit(tag, es, NULL_OUTPUT_CHAIN) key, data = self.last_emit_via_buffer ensure @emit_count = current_emit_count self.last_emit_via_buffer = nil end # on-the-fly key assignment can be done, and it's not configurable if Plugin#emit does it dynamically = @buffer.(variables: (key && !key.empty? ? {key: key} : nil)) write_guard do @buffer.write({ => data}, format: ->(_data){ _data }, size: ->(){ size }, enqueue: enqueue) end @counters_monitor.synchronize{ @emit_records += size } return [] end if @overrides_format_stream = (nil, nil, nil) size = es.size bulk = format_stream(tag, es) write_guard do @buffer.write({ => bulk}, format: ->(_data){ _data }, size: ->(){ size }, enqueue: enqueue) end @counters_monitor.synchronize{ @emit_records += size } return [] end = (nil, nil, nil) size = es.size data = es.map{|time,record| format(tag, time, record) } write_guard do @buffer.write({ => data}, enqueue: enqueue) end @counters_monitor.synchronize{ @emit_records += size } [] end |
#start ⇒ Object
389 390 391 392 393 394 395 396 397 398 399 400 |
# File 'lib/fluent/compat/output.rb', line 389 def start super if instance_variable_defined?(:@formatter) && @inject_config unless @formatter.class.ancestors.include?(Fluent::Compat::HandleTagAndTimeMixin) if @formatter.respond_to?(:owner) && !@formatter.owner @formatter.owner = self @formatter.singleton_class.prepend FormatterUtils::InjectMixin end end end end |
#submit_flush ⇒ Object
318 319 320 |
# File 'lib/fluent/compat/output.rb', line 318 def submit_flush # nothing todo: blank method to be called from #emit of 3rd party plugins end |
#support_in_v12_style?(feature) ⇒ Boolean
208 209 210 211 212 213 214 215 |
# File 'lib/fluent/compat/output.rb', line 208 def support_in_v12_style?(feature) case feature when :synchronous then false when :buffered then true when :delayed_commit then false when :custom_format then true end end |