Class: Purplelight::WriterJSONL

Inherits:
Object
  • Object
show all
Defined in:
lib/purplelight/writer_jsonl.rb

Overview

WriterJSONL writes newline-delimited JSON with optional compression.

Constant Summary collapse

DEFAULT_ROTATE_BYTES =
256 * 1024 * 1024

Instance Method Summary collapse

Constructor Details

#initialize(directory:, prefix:, compression: :zstd, rotate_bytes: DEFAULT_ROTATE_BYTES, logger: nil, manifest: nil, compression_level: nil) ⇒ WriterJSONL

Returns a new instance of WriterJSONL.



22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/purplelight/writer_jsonl.rb', line 22

def initialize(directory:, prefix:, compression: :zstd, rotate_bytes: DEFAULT_ROTATE_BYTES, logger: nil,
               manifest: nil, compression_level: nil)
  @directory = directory
  @prefix = prefix
  @compression = compression
  @rotate_bytes = rotate_bytes
  @logger = logger
  @manifest = manifest
  env_level = ENV['PL_ZSTD_LEVEL']&.to_i
  @compression_level = compression_level || (env_level&.positive? ? env_level : nil)

  @part_index = nil
  @io = nil
  @bytes_written = 0
  @rows_written = 0
  @file_seq = 0
  @closed = false

  @effective_compression = determine_effective_compression(@compression)
  @json_state = JSON::Ext::Generator::State.new(ascii_only: false, max_nesting: false)
  if @logger
    level_disp = @compression_level || (ENV['PL_ZSTD_LEVEL']&.to_i if @effective_compression.to_s == 'zstd')
    @logger.info("WriterJSONL using compression='#{@effective_compression}' level='#{level_disp || 'default'}'")
  end
  return unless @effective_compression.to_s != @compression.to_s

  @logger&.warn("requested compression '#{@compression}' not available; using '#{@effective_compression}'")
end

Instance Method Details

#closeObject



118
119
120
121
122
123
124
125
126
# File 'lib/purplelight/writer_jsonl.rb', line 118

def close
  return if @closed

  if @io
    finalize_current_part!
    @io.close
  end
  @closed = true
end

#rotate_if_neededObject



111
112
113
114
115
116
# File 'lib/purplelight/writer_jsonl.rb', line 111

def rotate_if_needed
  return if @rotate_bytes.nil?
  return if @bytes_written < @rotate_bytes

  rotate!
end

#write_many(batch) ⇒ Object



51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# File 'lib/purplelight/writer_jsonl.rb', line 51

def write_many(batch)
  ensure_open!

  chunk_threshold = ENV['PL_WRITE_CHUNK_BYTES']&.to_i || (8 * 1024 * 1024)
  total_bytes = 0
  rows = 0

  if batch.is_a?(String)
    # Fast-path: writer received a preassembled buffer string
    buffer = batch
    rows = buffer.count("\n")
    write_buffer(buffer)
    total_bytes = buffer.bytesize
  elsif batch.first.is_a?(String)
    # Join and write in chunks to avoid large intermediate allocations
    chunk = +''
    chunk_bytes = 0
    batch.each do |line|
      chunk << line
      rows += 1
      chunk_bytes += line.bytesize
      next unless chunk_bytes >= chunk_threshold

      write_buffer(chunk)
      total_bytes += chunk.bytesize
      chunk = +''
      chunk_bytes = 0
    end
    unless chunk.empty?
      write_buffer(chunk)
      total_bytes += chunk.bytesize
    end
  else
    # Fallback: encode docs here (JSON.fast_generate preferred) and write in chunks
    chunk = +''
    chunk_bytes = 0
    batch.each do |doc|
      json = @json_state.generate(doc)
      rows += 1
      bytes = json.bytesize + 1
      chunk << json
      chunk << "\n"
      chunk_bytes += bytes
      next unless chunk_bytes >= chunk_threshold

      write_buffer(chunk)
      total_bytes += chunk_bytes
      chunk = +''
      chunk_bytes = 0
    end
    unless chunk.empty?
      write_buffer(chunk)
      total_bytes += chunk_bytes
    end
  end

  @rows_written += rows
  @manifest&.add_progress_to_part!(index: @part_index, rows_delta: rows, bytes_delta: total_bytes)
end