Method: Fluent::Plugin::Buffer#write_once

Defined in:
lib/fluent/plugin/buffer.rb

#write_once(metadata, data, format: nil, size: nil, &block) ⇒ Object

write once into a chunk

  1. append whole data into existing chunk

  2. commit it & return unless chunk_size_over?

  3. enqueue existing chunk & retry whole method if chunk was not empty

  4. go to step_by_step writing



653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
# File 'lib/fluent/plugin/buffer.rb', line 653

def write_once(, data, format: nil, size: nil, &block)
  return if data.empty?

  stored = false
  adding_bytesize = nil

  chunk = synchronize { @stage[] ||= generate_chunk().staged! }
  enqueue_chunk_before_retry = false
  chunk.synchronize do
    # retry this method if chunk is already queued (between getting chunk and entering critical section)
    raise ShouldRetry unless chunk.staged?

    empty_chunk = chunk.empty?

    original_bytesize = chunk.bytesize
    begin
      if format
        serialized = format.call(data)
        chunk.concat(serialized, size ? size.call : data.size)
      else
        chunk.append(data, compress: @compress)
      end
      adding_bytesize = chunk.bytesize - original_bytesize

      if chunk_size_over?(chunk)
        if format && empty_chunk
          if chunk.bytesize > @chunk_limit_size
            log.warn "chunk bytes limit exceeds for an emitted event stream: #{adding_bytesize}bytes"
          else
            log.warn "chunk size limit exceeds for an emitted event stream: #{chunk.size}records"
          end
        end
        chunk.rollback

        if format && !empty_chunk
          # Event streams should be appended into a chunk at once
          # as far as possible, to improve performance of formatting.
          # Event stream may be a MessagePackEventStream. We don't want to split it into
          # 2 or more chunks (except for a case that the event stream is larger than chunk limit).
          enqueue_chunk_before_retry = true
          raise ShouldRetry
        end
      else
        stored = true
      end
    rescue
      chunk.rollback
      raise
    end

    if stored
      block.call(chunk, adding_bytesize)
    end
  end

  unless stored
    # try step-by-step appending if data can't be stored into existing a chunk in non-bulk mode
    #
    # 1/10 size of original event stream (splits_count == 10) seems enough small
    # to try emitting events into existing chunk.
    # it does not matter to split event stream into very small splits, because chunks have less
    # overhead to write data many times (even about file buffer chunks).
    write_step_by_step(, data, format, 10, &block)
  end
rescue ShouldRetry
  enqueue_chunk() if enqueue_chunk_before_retry
  retry
end