Method: Fluent::Plugin::Buffer#write

Defined in:
lib/fluent/plugin/buffer.rb

#write(metadata_and_data, format: nil, size: nil, enqueue: false) ⇒ Object

metadata MUST have consistent object_id for each variation data MUST be Array of serialized events, or EventStream metadata_and_data MUST be a hash of { metadata => data }



328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
# File 'lib/fluent/plugin/buffer.rb', line 328

def write(, format: nil, size: nil, enqueue: false)
  return if .size < 1
  raise BufferOverflowError, "buffer space has too many data" unless storable?

  log.on_trace { log.trace "writing events into buffer", instance: self.object_id, metadata_size: .size }

  operated_chunks = []
  unstaged_chunks = {} # metadata => [chunk, chunk, ...]
  chunks_to_enqueue = []
  staged_bytesizes_by_chunk = {}
  # track internal BufferChunkOverflowError in write_step_by_step
  buffer_chunk_overflow_errors = []

  begin
    # sort metadata to get lock of chunks in same order with other threads
    .keys.sort.each do ||
      data = []
      write_once(, data, format: format, size: size) do |chunk, adding_bytesize, error|
        chunk.mon_enter # add lock to prevent to be committed/rollbacked from other threads
        operated_chunks << chunk
        if chunk.staged?
          #
          # https://github.com/fluent/fluentd/issues/2712
          # write_once is supposed to write to a chunk only once
          # but this block **may** run multiple times from write_step_by_step and previous write may be rollbacked
          # So we should be counting the stage_size only for the last successful write
          #
          staged_bytesizes_by_chunk[chunk] = adding_bytesize
        elsif chunk.unstaged?
          unstaged_chunks[] ||= []
          unstaged_chunks[] << chunk
        end
        if error && !error.empty?
          buffer_chunk_overflow_errors << error
        end
      end
    end

    return if operated_chunks.empty?

    # Now, this thread acquires many locks of chunks... getting buffer-global lock causes dead lock.
    # Any operations needs buffer-global lock (including enqueueing) should be done after releasing locks.

    first_chunk = operated_chunks.shift
    # Following commits for other chunks also can finish successfully if the first commit operation
    # finishes without any exceptions.
    # In most cases, #commit just requires very small disk spaces, so major failure reason are
    # permission errors, disk failures and other permanent(fatal) errors.
    begin
      first_chunk.commit
      if enqueue || first_chunk.unstaged? || chunk_size_full?(first_chunk)
        chunks_to_enqueue << first_chunk
      end
      first_chunk.mon_exit
    rescue
      operated_chunks.unshift(first_chunk)
      raise
    end

    errors = []
    # Buffer plugin estimates there's no serious error cause: will commit for all chunks eigher way
    operated_chunks.each do |chunk|
      begin
        chunk.commit
        if enqueue || chunk.unstaged? || chunk_size_full?(chunk)
          chunks_to_enqueue << chunk
        end
        chunk.mon_exit
      rescue => e
        chunk.rollback
        chunk.mon_exit
        errors << e
      end
    end

    # All locks about chunks are released.

    #
    # Now update the stage, stage_size with proper locking
    # FIX FOR stage_size miscomputation - https://github.com/fluent/fluentd/issues/2712
    #
    staged_bytesizes_by_chunk.each do |chunk, bytesize|
      chunk.synchronize do
        synchronize { @stage_size_metrics.add(bytesize) }
        log.on_trace { log.trace { "chunk #{chunk.path} size_added: #{bytesize} new_size: #{chunk.bytesize}" } }
      end
    end

    chunks_to_enqueue.each do |c|
      if c.staged? && (enqueue || chunk_size_full?(c))
        m = c.
        enqueue_chunk(m)
        if unstaged_chunks[m] && !unstaged_chunks[m].empty?
          u = unstaged_chunks[m].pop
          u.synchronize do
            if u.unstaged? && !chunk_size_full?(u)
              # `u.metadata.seq` and `m.seq` can be different but Buffer#enqueue_chunk expect them to be the same value
              u..seq = 0
              synchronize {
                @stage[m] = u.staged!
                @stage_size_metrics.add(u.bytesize)
              }
            end
          end
        end
      elsif c.unstaged?
        enqueue_unstaged_chunk(c)
      else
        # previously staged chunk is already enqueued, closed or purged.
        # no problem.
      end
    end

    operated_chunks.clear if errors.empty?

    if errors.size > 0
      log.warn "error occurs in committing chunks: only first one raised", errors: errors.map(&:class)
      raise errors.first
    end
  ensure
    operated_chunks.each do |chunk|
      chunk.rollback rescue nil # nothing possible to do for #rollback failure
      if chunk.unstaged?
        chunk.purge rescue nil # to prevent leakage of unstaged chunks
      end
      chunk.mon_exit rescue nil # this may raise ThreadError for chunks already committed
    end
    unless buffer_chunk_overflow_errors.empty?
      # Notify delayed BufferChunkOverflowError here
      raise BufferChunkOverflowError, buffer_chunk_overflow_errors.join(", ")
    end
  end
end