Class: Purplelight::WriterCSV
- Inherits:
-
Object
- Object
- Purplelight::WriterCSV
- Defined in:
- lib/purplelight/writer_csv.rb
Overview
WriterCSV writes documents to CSV files with optional compression.
Defined Under Namespace
Classes: CountingIO
Constant Summary collapse
- DEFAULT_ROTATE_BYTES =
256 * 1024 * 1024
Instance Method Summary collapse
- #close ⇒ Object
-
#initialize(directory:, prefix:, compression: :zstd, rotate_bytes: DEFAULT_ROTATE_BYTES, logger: nil, manifest: nil, single_file: false, columns: nil, headers: true) ⇒ WriterCSV
constructor
A new instance of WriterCSV.
- #rotate_if_needed ⇒ Object
- #write_many(array_of_docs) ⇒ Object
Constructor Details
#initialize(directory:, prefix:, compression: :zstd, rotate_bytes: DEFAULT_ROTATE_BYTES, logger: nil, manifest: nil, single_file: false, columns: nil, headers: true) ⇒ WriterCSV
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
# File 'lib/purplelight/writer_csv.rb', line 23 def initialize(directory:, prefix:, compression: :zstd, rotate_bytes: DEFAULT_ROTATE_BYTES, logger: nil, manifest: nil, single_file: false, columns: nil, headers: true) @directory = directory @prefix = prefix @compression = compression @rotate_bytes = rotate_bytes @logger = logger @manifest = manifest env_level = ENV['PL_ZSTD_LEVEL']&.to_i @compression_level = (env_level&.positive? ? env_level : nil) @single_file = single_file @columns = columns&.map(&:to_s) @headers = headers @part_index = nil @io = nil @csv = nil @bytes_written = 0 @rows_written = 0 @file_seq = 0 @closed = false @effective_compression = determine_effective_compression(@compression) return unless @effective_compression.to_s != @compression.to_s @logger&.warn("requested compression '#{@compression}' not available; using '#{@effective_compression}'") end |
Instance Method Details
#close ⇒ Object
85 86 87 88 89 90 91 92 93 94 95 96 |
# File 'lib/purplelight/writer_csv.rb', line 85 def close return if @closed @csv&.flush if @io t = Thread.current[:pl_telemetry]&.start(:rotate_time) finalize_current_part! @io.close Thread.current[:pl_telemetry]&.finish(:rotate_time, t) end @closed = true end |
#rotate_if_needed ⇒ Object
75 76 77 78 79 80 81 82 83 |
# File 'lib/purplelight/writer_csv.rb', line 75 def rotate_if_needed return if @single_file return if @rotate_bytes.nil? raw_bytes = @io.respond_to?(:pos) ? @io.pos : @bytes_written return if raw_bytes < @rotate_bytes rotate! end |
#write_many(array_of_docs) ⇒ Object
52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 |
# File 'lib/purplelight/writer_csv.rb', line 52 def write_many(array_of_docs) ensure_open! # infer columns if needed from docs if @columns.nil? sample_docs = array_of_docs.is_a?(Array) ? array_of_docs : [] sample_docs = sample_docs.reject { |d| d.is_a?(String) } @columns = infer_columns(sample_docs) @csv << @columns if @headers end array_of_docs.each do |doc| next if doc.is_a?(String) row = @columns.map { |k| extract_value(doc, k) } @csv << row @rows_written += 1 end @manifest&.add_progress_to_part!(index: @part_index, rows_delta: array_of_docs.size, bytes_delta: 0) rotate_if_needed end |