Class: Fluent::Compat::BufferedOutput
- Inherits:
-
Plugin::Output
- Object
- Plugin::Base
- Plugin::Output
- Fluent::Compat::BufferedOutput
- Includes:
- PropagateDefault
- Defined in:
- lib/fluent/compat/output.rb
Constant Summary collapse
- BUFFER_PARAMS =
Fluent::PluginHelper::CompatParameters::BUFFER_PARAMS
Constants inherited from Plugin::Output
Plugin::Output::CHUNKING_FIELD_WARN_NUM, Plugin::Output::CHUNK_KEY_PATTERN, Plugin::Output::CHUNK_KEY_PLACEHOLDER_PATTERN, Plugin::Output::FORMAT_MSGPACK_STREAM, Plugin::Output::FORMAT_MSGPACK_STREAM_TIME_INT
Constants included from Fluent::Configurable
Fluent::Configurable::CONFIG_TYPE_REGISTRY
Instance Attribute Summary
Attributes inherited from Plugin::Output
#as_secondary, #buffer, #chunk_key_tag, #chunk_key_time, #chunk_keys, #delayed_commit, #delayed_commit_timeout, #emit_count, #emit_records, #in_tests, #num_errors, #output_enqueue_thread_waiting, #retry, #rollback_count, #secondary, #write_count
Attributes included from PluginLoggerMixin
Class Method Summary collapse
Instance Method Summary collapse
- #configure(conf) ⇒ Object
-
#emit(tag, es, chain, key = "") ⇒ Object
original implementation of v0.12 BufferedOutput.
- #extract_placeholders(str, metadata) ⇒ Object
- #format_stream(tag, es) ⇒ Object
-
#handle_stream_simple(tag, es, enqueue: false) ⇒ Object
This method overrides Fluent::Plugin::Output#handle_stream_simple because v0.12 BufferedOutput may overrides #format_stream, but original #handle_stream_simple method doesn’t consider about it.
-
#initialize ⇒ BufferedOutput
constructor
A new instance of BufferedOutput.
- #start ⇒ Object
- #submit_flush ⇒ Object
- #support_in_v12_style?(feature) ⇒ Boolean
Methods included from PropagateDefault
Methods inherited from Plugin::Output
#acts_as_secondary, #after_shutdown, #after_start, #before_shutdown, #close, #commit_write, #emit_buffered, #emit_events, #emit_sync, #enqueue_thread_run, #enqueue_thread_wait, #execute_chunking, #flush_thread_run, #flush_thread_wakeup, #force_flush, #format, #handle_stream_with_custom_format, #handle_stream_with_standard_format, #implement?, #interrupt_flushes, #metadata, #next_flush_time, #prefer_buffered_processing, #prefer_delayed_commit, #process, #retry_state, #rollback_write, #shutdown, #stop, #submit_flush_all, #submit_flush_once, #terminate, #try_flush, #try_rollback_all, #try_rollback_write, #try_write, #write, #write_guard
Methods included from UniqueId::Mixin
#dump_unique_id_hex, #generate_unique_id
Methods included from PluginHelper::Mixin
Methods included from PluginLoggerMixin
Methods included from PluginId
#plugin_id, #plugin_id_configured?, #plugin_id_for_test?
Methods inherited from Plugin::Base
#after_shutdown, #after_shutdown?, #after_start, #after_started?, #before_shutdown, #before_shutdown?, #close, #closed?, #configured?, #has_router?, #inspect, #shutdown, #shutdown?, #started?, #stop, #stopped?, #terminate, #terminated?
Methods included from SystemConfig::Mixin
#system_config, #system_config_override
Methods included from Fluent::Configurable
#config, included, lookup_type, register_type
Constructor Details
#initialize ⇒ BufferedOutput
Returns a new instance of BufferedOutput.
379 380 381 382 383 384 |
# File 'lib/fluent/compat/output.rb', line 379 def initialize super unless self.class.ancestors.include?(Fluent::Compat::CallSuperMixin) self.class.prepend Fluent::Compat::CallSuperMixin end end |
Class Method Details
.propagate_default_params ⇒ Object
243 244 245 |
# File 'lib/fluent/compat/output.rb', line 243 def self.propagate_default_params BUFFER_PARAMS end |
Instance Method Details
#configure(conf) ⇒ Object
248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 |
# File 'lib/fluent/compat/output.rb', line 248 def configure(conf) bufconf = CompatOutputUtils.buffer_section(conf) config_style = (bufconf ? :v1 : :v0) if config_style == :v0 buf_params = { "flush_mode" => "interval", "retry_type" => "exponential_backoff", } BUFFER_PARAMS.each do |older, newer| next unless newer if conf.has_key?(older) if older == 'buffer_queue_full_action' && conf[older] == 'exception' buf_params[newer] = 'throw_exception' else buf_params[newer] = conf[older] end end end conf.elements << Fluent::Config::Element.new('buffer', '', buf_params, []) end @includes_record_filter = self.class.ancestors.include?(Fluent::Compat::RecordFilterMixin) methods_of_plugin = self.class.instance_methods(false) @overrides_emit = methods_of_plugin.include?(:emit) # RecordFilter mixin uses its own #format_stream method implementation @overrides_format_stream = methods_of_plugin.include?(:format_stream) || @includes_record_filter ParserUtils.convert_parser_conf(conf) FormatterUtils.convert_formatter_conf(conf) super if config_style == :v1 unless @buffer_config.chunk_keys.empty? raise Fluent::ConfigError, "this plugin '#{self.class}' cannot handle arguments for <buffer ...> section" end end self.extend BufferedChunkMixin if @overrides_emit self.singleton_class.module_eval do attr_accessor :last_emit_via_buffer end output_plugin = self m = Module.new do define_method(:emit) do |key, data, chain| # receivers of this method are buffer instances output_plugin.last_emit_via_buffer = [key, data] end end @buffer.extend m end end |
#emit(tag, es, chain, key = "") ⇒ Object
original implementation of v0.12 BufferedOutput
306 307 308 309 310 311 312 313 |
# File 'lib/fluent/compat/output.rb', line 306 def emit(tag, es, chain, key="") # this method will not be used except for the case that plugin calls super @emit_count += 1 data = format_stream(tag, es) if @buffer.emit(key, data, chain) submit_flush end end |
#extract_placeholders(str, metadata) ⇒ Object
375 376 377 |
# File 'lib/fluent/compat/output.rb', line 375 def extract_placeholders(str, ) raise "BUG: compat plugin does not support extract_placeholders: use newer plugin API" end |
#format_stream(tag, es) ⇒ Object
319 320 321 322 323 324 325 326 |
# File 'lib/fluent/compat/output.rb', line 319 def format_stream(tag, es) # this method will not be used except for the case that plugin calls super out = '' es.each do |time, record| out << format(tag, time, record) end out end |
#handle_stream_simple(tag, es, enqueue: false) ⇒ Object
This method overrides Fluent::Plugin::Output#handle_stream_simple because v0.12 BufferedOutput may overrides #format_stream, but original #handle_stream_simple method doesn’t consider about it
333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 |
# File 'lib/fluent/compat/output.rb', line 333 def handle_stream_simple(tag, es, enqueue: false) if @overrides_emit current_emit_count = @emit_count size = es.size key = data = nil begin emit(tag, es, NULL_OUTPUT_CHAIN) key, data = self.last_emit_via_buffer ensure @emit_count = current_emit_count self.last_emit_via_buffer = nil end # on-the-fly key assignment can be done, and it's not configurable if Plugin#emit does it dynamically = @buffer.(variables: (key && !key.empty? ? {key: key} : nil)) write_guard do @buffer.write({ => data}, format: ->(_data){ _data }, size: ->(){ size }, enqueue: enqueue) end @counters_monitor.synchronize{ @emit_records += size } return [] end if @overrides_format_stream = (nil, nil, nil) size = es.size bulk = format_stream(tag, es) write_guard do @buffer.write({ => bulk}, format: ->(_data){ _data }, size: ->(){ size }, enqueue: enqueue) end @counters_monitor.synchronize{ @emit_records += size } return [] end = (nil, nil, nil) size = es.size data = es.map{|time,record| format(tag, time, record) } write_guard do @buffer.write({ => data}, enqueue: enqueue) end @counters_monitor.synchronize{ @emit_records += size } [] end |
#start ⇒ Object
386 387 388 389 390 391 392 393 394 395 396 397 |
# File 'lib/fluent/compat/output.rb', line 386 def start super if instance_variable_defined?(:@formatter) && @inject_config unless @formatter.class.ancestors.include?(Fluent::Compat::HandleTagAndTimeMixin) if @formatter.respond_to?(:owner) && !@formatter.owner @formatter.owner = self @formatter.singleton_class.prepend FormatterUtils::InjectMixin end end end end |
#submit_flush ⇒ Object
315 316 317 |
# File 'lib/fluent/compat/output.rb', line 315 def submit_flush # nothing todo: blank method to be called from #emit of 3rd party plugins end |
#support_in_v12_style?(feature) ⇒ Boolean
205 206 207 208 209 210 211 212 |
# File 'lib/fluent/compat/output.rb', line 205 def support_in_v12_style?(feature) case feature when :synchronous then false when :buffered then true when :delayed_commit then false when :custom_format then true end end |