Method: Fluent::Plugin::Buffer#write_step_by_step
- Defined in:
- lib/fluent/plugin/buffer.rb
#write_step_by_step(metadata, data, format, splits_count, &block) ⇒ Object
-
split event streams into many (10 -> 100 -> 1000 -> …) chunks
-
append splits into the staged chunks as much as possible
-
create unstaged chunk and append rest splits -> repeat it for all splits
729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 |
# File 'lib/fluent/plugin/buffer.rb', line 729 def write_step_by_step(, data, format, splits_count, &block) splits = [] if splits_count > data.size splits_count = data.size end slice_size = if data.size % splits_count == 0 data.size / splits_count else data.size / (splits_count - 1) end slice_origin = 0 while slice_origin < data.size splits << data.slice(slice_origin, slice_size) slice_origin += slice_size end # This method will append events into the staged chunk at first. # Then, will generate chunks not staged (not queued) to append rest data. staged_chunk_used = false modified_chunks = [] = get_next_chunk = ->(){ if staged_chunk_used # Staging new chunk here is bad idea: # Recovering whole state including newly staged chunks is much harder than current implementation. = .dup_next generate_chunk() else synchronize { @stage[] ||= generate_chunk().staged! } end } writing_splits_index = 0 enqueue_chunk_before_retry = false while writing_splits_index < splits.size chunk = get_next_chunk.call errors = [] modified_chunks << {chunk: chunk, adding_bytesize: 0, errors: errors} chunk.synchronize do raise ShouldRetry unless chunk.writable? staged_chunk_used = true if chunk.staged? original_bytesize = committed_bytesize = chunk.bytesize begin while writing_splits_index < splits.size split = splits[writing_splits_index] formatted_split = format ? format.call(split) : nil if split.size == 1 # Check BufferChunkOverflowError determined_bytesize = nil if @compress != :text determined_bytesize = nil elsif formatted_split determined_bytesize = formatted_split.bytesize elsif split.first.respond_to?(:bytesize) determined_bytesize = split.first.bytesize end if determined_bytesize && determined_bytesize > @chunk_limit_size # It is a obvious case that BufferChunkOverflowError should be raised here. # But if it raises here, already processed 'split' or # the proceeding 'split' will be lost completely. # So it is a last resort to delay raising such a exception errors << "a #{determined_bytesize} bytes record (nth: #{writing_splits_index}) is larger than buffer chunk limit size (#{@chunk_limit_size})" writing_splits_index += 1 next end if determined_bytesize.nil? || chunk.bytesize + determined_bytesize > @chunk_limit_size # The split will (might) cause size over so keep already processed # 'split' content here (allow performance regression a bit). chunk.commit committed_bytesize = chunk.bytesize end end if format chunk.concat(formatted_split, split.size) else chunk.append(split, compress: @compress) end adding_bytes = chunk.bytesize - committed_bytesize if chunk_size_over?(chunk) # split size is larger than difference between size_full? and size_over? chunk.rollback committed_bytesize = chunk.bytesize if split.size == 1 # Check BufferChunkOverflowError again if adding_bytes > @chunk_limit_size errors << "concatenated/appended a #{adding_bytes} bytes record (nth: #{writing_splits_index}) is larger than buffer chunk limit size (#{@chunk_limit_size})" writing_splits_index += 1 next else # As already processed content is kept after rollback, then unstaged chunk should be queued. # After that, re-process current split again. # New chunk should be allocated, to do it, modify @stage and so on. synchronize { @stage.delete() } staged_chunk_used = false chunk.unstaged! break end end if chunk_size_full?(chunk) || split.size == 1 enqueue_chunk_before_retry = true else splits_count *= 10 end raise ShouldRetry end writing_splits_index += 1 if chunk_size_full?(chunk) break end end rescue chunk.purge if chunk.unstaged? # unstaged chunk will leak unless purge it raise end modified_chunks.last[:adding_bytesize] = chunk.bytesize - original_bytesize end end modified_chunks.each do |data| block.call(data[:chunk], data[:adding_bytesize], data[:errors]) end rescue ShouldRetry modified_chunks.each do |data| chunk = data[:chunk] chunk.rollback rescue nil if chunk.unstaged? chunk.purge rescue nil end end enqueue_chunk() if enqueue_chunk_before_retry retry end |