Class: Purplelight::WriterJSONL
- Inherits:
-
Object
- Object
- Purplelight::WriterJSONL
- Defined in:
- lib/purplelight/writer_jsonl.rb
Overview
WriterJSONL writes newline-delimited JSON with optional compression.
Constant Summary collapse
- DEFAULT_ROTATE_BYTES =
256 * 1024 * 1024
Instance Method Summary collapse
- #close ⇒ Object
-
#initialize(directory:, prefix:, compression: :zstd, rotate_bytes: DEFAULT_ROTATE_BYTES, logger: nil, manifest: nil, compression_level: nil) ⇒ WriterJSONL
constructor
A new instance of WriterJSONL.
- #rotate_if_needed ⇒ Object
- #write_many(batch) ⇒ Object
Constructor Details
#initialize(directory:, prefix:, compression: :zstd, rotate_bytes: DEFAULT_ROTATE_BYTES, logger: nil, manifest: nil, compression_level: nil) ⇒ WriterJSONL
Returns a new instance of WriterJSONL.
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/purplelight/writer_jsonl.rb', line 22 def initialize(directory:, prefix:, compression: :zstd, rotate_bytes: DEFAULT_ROTATE_BYTES, logger: nil, manifest: nil, compression_level: nil) @directory = directory @prefix = prefix @compression = compression @rotate_bytes = rotate_bytes @logger = logger @manifest = manifest env_level = ENV['PL_ZSTD_LEVEL']&.to_i @compression_level = compression_level || (env_level&.positive? ? env_level : nil) @part_index = nil @io = nil @bytes_written = 0 @rows_written = 0 @file_seq = 0 @closed = false @effective_compression = determine_effective_compression(@compression) @json_state = JSON::Ext::Generator::State.new(ascii_only: false, max_nesting: false) if @logger level_disp = @compression_level || (ENV['PL_ZSTD_LEVEL']&.to_i if @effective_compression.to_s == 'zstd') @logger.info("WriterJSONL using compression='#{@effective_compression}' level='#{level_disp || 'default'}'") end return unless @effective_compression.to_s != @compression.to_s @logger&.warn("requested compression '#{@compression}' not available; using '#{@effective_compression}'") end |
Instance Method Details
#close ⇒ Object
118 119 120 121 122 123 124 125 126 |
# File 'lib/purplelight/writer_jsonl.rb', line 118 def close return if @closed if @io finalize_current_part! @io.close end @closed = true end |
#rotate_if_needed ⇒ Object
111 112 113 114 115 116 |
# File 'lib/purplelight/writer_jsonl.rb', line 111 def rotate_if_needed return if @rotate_bytes.nil? return if @bytes_written < @rotate_bytes rotate! end |
#write_many(batch) ⇒ Object
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 |
# File 'lib/purplelight/writer_jsonl.rb', line 51 def write_many(batch) ensure_open! chunk_threshold = ENV['PL_WRITE_CHUNK_BYTES']&.to_i || (8 * 1024 * 1024) total_bytes = 0 rows = 0 if batch.is_a?(String) # Fast-path: writer received a preassembled buffer string buffer = batch rows = buffer.count("\n") write_buffer(buffer) total_bytes = buffer.bytesize elsif batch.first.is_a?(String) # Join and write in chunks to avoid large intermediate allocations chunk = +'' chunk_bytes = 0 batch.each do |line| chunk << line rows += 1 chunk_bytes += line.bytesize next unless chunk_bytes >= chunk_threshold write_buffer(chunk) total_bytes += chunk.bytesize chunk = +'' chunk_bytes = 0 end unless chunk.empty? write_buffer(chunk) total_bytes += chunk.bytesize end else # Fallback: encode docs here (JSON.fast_generate preferred) and write in chunks chunk = +'' chunk_bytes = 0 batch.each do |doc| json = @json_state.generate(doc) rows += 1 bytes = json.bytesize + 1 chunk << json chunk << "\n" chunk_bytes += bytes next unless chunk_bytes >= chunk_threshold write_buffer(chunk) total_bytes += chunk_bytes chunk = +'' chunk_bytes = 0 end unless chunk.empty? write_buffer(chunk) total_bytes += chunk_bytes end end @rows_written += rows @manifest&.add_progress_to_part!(index: @part_index, rows_delta: rows, bytes_delta: total_bytes) end |