Method: Fluent::Plugin::Buffer#write
- Defined in:
- lib/fluent/plugin/buffer.rb
#write(metadata_and_data, format: nil, size: nil, enqueue: false) ⇒ Object
metadata MUST have consistent object_id for each variation data MUST be Array of serialized events, or EventStream metadata_and_data MUST be a hash of { metadata => data }
328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 |
# File 'lib/fluent/plugin/buffer.rb', line 328 def write(, format: nil, size: nil, enqueue: false) return if .size < 1 raise BufferOverflowError, "buffer space has too many data" unless storable? log.on_trace { log.trace "writing events into buffer", instance: self.object_id, metadata_size: .size } operated_chunks = [] unstaged_chunks = {} # metadata => [chunk, chunk, ...] chunks_to_enqueue = [] staged_bytesizes_by_chunk = {} # track internal BufferChunkOverflowError in write_step_by_step buffer_chunk_overflow_errors = [] begin # sort metadata to get lock of chunks in same order with other threads .keys.sort.each do || data = [] write_once(, data, format: format, size: size) do |chunk, adding_bytesize, error| chunk.mon_enter # add lock to prevent to be committed/rollbacked from other threads operated_chunks << chunk if chunk.staged? # # https://github.com/fluent/fluentd/issues/2712 # write_once is supposed to write to a chunk only once # but this block **may** run multiple times from write_step_by_step and previous write may be rollbacked # So we should be counting the stage_size only for the last successful write # staged_bytesizes_by_chunk[chunk] = adding_bytesize elsif chunk.unstaged? unstaged_chunks[] ||= [] unstaged_chunks[] << chunk end if error && !error.empty? buffer_chunk_overflow_errors << error end end end return if operated_chunks.empty? # Now, this thread acquires many locks of chunks... getting buffer-global lock causes dead lock. # Any operations needs buffer-global lock (including enqueueing) should be done after releasing locks. first_chunk = operated_chunks.shift # Following commits for other chunks also can finish successfully if the first commit operation # finishes without any exceptions. # In most cases, #commit just requires very small disk spaces, so major failure reason are # permission errors, disk failures and other permanent(fatal) errors. begin first_chunk.commit if enqueue || first_chunk.unstaged? || chunk_size_full?(first_chunk) chunks_to_enqueue << first_chunk end first_chunk.mon_exit rescue operated_chunks.unshift(first_chunk) raise end errors = [] # Buffer plugin estimates there's no serious error cause: will commit for all chunks eigher way operated_chunks.each do |chunk| begin chunk.commit if enqueue || chunk.unstaged? || chunk_size_full?(chunk) chunks_to_enqueue << chunk end chunk.mon_exit rescue => e chunk.rollback chunk.mon_exit errors << e end end # All locks about chunks are released. # # Now update the stage, stage_size with proper locking # FIX FOR stage_size miscomputation - https://github.com/fluent/fluentd/issues/2712 # staged_bytesizes_by_chunk.each do |chunk, bytesize| chunk.synchronize do synchronize { @stage_size_metrics.add(bytesize) } log.on_trace { log.trace { "chunk #{chunk.path} size_added: #{bytesize} new_size: #{chunk.bytesize}" } } end end chunks_to_enqueue.each do |c| if c.staged? && (enqueue || chunk_size_full?(c)) m = c. enqueue_chunk(m) if unstaged_chunks[m] && !unstaged_chunks[m].empty? u = unstaged_chunks[m].pop u.synchronize do if u.unstaged? && !chunk_size_full?(u) # `u.metadata.seq` and `m.seq` can be different but Buffer#enqueue_chunk expect them to be the same value u..seq = 0 synchronize { @stage[m] = u.staged! @stage_size_metrics.add(u.bytesize) } end end end elsif c.unstaged? enqueue_unstaged_chunk(c) else # previously staged chunk is already enqueued, closed or purged. # no problem. end end operated_chunks.clear if errors.empty? if errors.size > 0 log.warn "error occurs in committing chunks: only first one raised", errors: errors.map(&:class) raise errors.first end ensure operated_chunks.each do |chunk| chunk.rollback rescue nil # nothing possible to do for #rollback failure if chunk.unstaged? chunk.purge rescue nil # to prevent leakage of unstaged chunks end chunk.mon_exit rescue nil # this may raise ThreadError for chunks already committed end unless buffer_chunk_overflow_errors.empty? # Notify delayed BufferChunkOverflowError here raise BufferChunkOverflowError, buffer_chunk_overflow_errors.join(", ") end end end |