Class: Purplelight::WriterCSV

Inherits:
Object
  • Object
show all
Defined in:
lib/purplelight/writer_csv.rb

Overview

WriterCSV writes documents to CSV files with optional compression.

Defined Under Namespace

Classes: CountingIO

Constant Summary collapse

DEFAULT_ROTATE_BYTES =
256 * 1024 * 1024

Instance Method Summary collapse

Constructor Details

#initialize(directory:, prefix:, compression: :zstd, rotate_bytes: DEFAULT_ROTATE_BYTES, logger: nil, manifest: nil, single_file: false, columns: nil, headers: true) ⇒ WriterCSV



23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/purplelight/writer_csv.rb', line 23

def initialize(directory:, prefix:, compression: :zstd, rotate_bytes: DEFAULT_ROTATE_BYTES, logger: nil,
               manifest: nil, single_file: false, columns: nil, headers: true)
  @directory = directory
  @prefix = prefix
  @compression = compression
  @rotate_bytes = rotate_bytes
  @logger = logger
  @manifest = manifest
  env_level = ENV['PL_ZSTD_LEVEL']&.to_i
  @compression_level = (env_level&.positive? ? env_level : nil)
  @single_file = single_file

  @columns = columns&.map(&:to_s)
  @headers = headers

  @part_index = nil
  @io = nil
  @csv = nil
  @bytes_written = 0
  @rows_written = 0
  @file_seq = 0
  @closed = false

  @effective_compression = determine_effective_compression(@compression)
  return unless @effective_compression.to_s != @compression.to_s

  @logger&.warn("requested compression '#{@compression}' not available; using '#{@effective_compression}'")
end

Instance Method Details

#closeObject



85
86
87
88
89
90
91
92
93
94
95
96
# File 'lib/purplelight/writer_csv.rb', line 85

def close
  return if @closed

  @csv&.flush
  if @io
    t = Thread.current[:pl_telemetry]&.start(:rotate_time)
    finalize_current_part!
    @io.close
    Thread.current[:pl_telemetry]&.finish(:rotate_time, t)
  end
  @closed = true
end

#rotate_if_neededObject



75
76
77
78
79
80
81
82
83
# File 'lib/purplelight/writer_csv.rb', line 75

def rotate_if_needed
  return if @single_file
  return if @rotate_bytes.nil?

  raw_bytes = @io.respond_to?(:pos) ? @io.pos : @bytes_written
  return if raw_bytes < @rotate_bytes

  rotate!
end

#write_many(array_of_docs) ⇒ Object



52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/purplelight/writer_csv.rb', line 52

def write_many(array_of_docs)
  ensure_open!

  # infer columns if needed from docs
  if @columns.nil?
    sample_docs = array_of_docs.is_a?(Array) ? array_of_docs : []
    sample_docs = sample_docs.reject { |d| d.is_a?(String) }
    @columns = infer_columns(sample_docs)
    @csv << @columns if @headers
  end

  array_of_docs.each do |doc|
    next if doc.is_a?(String)

    row = @columns.map { |k| extract_value(doc, k) }
    @csv << row
    @rows_written += 1
  end
  @manifest&.add_progress_to_part!(index: @part_index, rows_delta: array_of_docs.size, bytes_delta: 0)

  rotate_if_needed
end