Class: Purplelight::Manifest

Inherits:
Object
  • Object
show all
Defined in:
lib/purplelight/manifest.rb

Overview

Manifest persists snapshot run metadata and progress to a JSON file.

It records configuration, partition checkpoints, and per-part byte/row counts so interrupted runs can resume safely and completed runs are reproducible. Methods are thread-safe where mutation occurs.

Constant Summary collapse

DEFAULT_VERSION =
1

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(path:, data: nil) ⇒ Manifest

Returns a new instance of Manifest.



25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/purplelight/manifest.rb', line 25

def initialize(path:, data: nil)
  @path = path
  @data = data || {
    'version' => DEFAULT_VERSION,
    'run_id' => SecureRandom.uuid,
    'created_at' => Time.now.utc.iso8601,
    'collection' => nil,
    'format' => nil,
    'compression' => nil,
    'query_digest' => nil,
    'options' => {},
    'parts' => [],
    'partitions' => []
  }
  @mutex = Mutex.new
  @last_save_at = Time.now
end

Instance Attribute Details

#dataObject (readonly)

Returns the value of attribute data.



18
19
20
# File 'lib/purplelight/manifest.rb', line 18

def data
  @data
end

#pathObject (readonly)

Returns the value of attribute path.



18
19
20
# File 'lib/purplelight/manifest.rb', line 18

def path
  @path
end

Class Method Details

.load(path) ⇒ Object



43
44
45
46
# File 'lib/purplelight/manifest.rb', line 43

def self.load(path)
  data = JSON.parse(File.read(path))
  new(path: path, data: data)
end

.query_digest(query, projection) ⇒ Object



20
21
22
23
# File 'lib/purplelight/manifest.rb', line 20

def self.query_digest(query, projection)
  payload = { query: query, projection: projection }
  Digest::SHA256.hexdigest(JSON.generate(payload))
end

Instance Method Details

#add_progress_to_part!(index:, rows_delta:, bytes_delta:) ⇒ Object



109
110
111
112
113
114
115
116
# File 'lib/purplelight/manifest.rb', line 109

def add_progress_to_part!(index:, rows_delta:, bytes_delta:)
  @mutex.synchronize do
    part = @data['parts'][index]
    part['rows'] += rows_delta
    part['bytes'] += bytes_delta
    save_maybe!
  end
end

#compatible_with?(collection:, format:, compression:, query_digest:) ⇒ Boolean

Returns:

  • (Boolean)


65
66
67
68
69
70
# File 'lib/purplelight/manifest.rb', line 65

def compatible_with?(collection:, format:, compression:, query_digest:)
  @data['collection'] == collection &&
    @data['format'] == format.to_s &&
    @data['compression'] == compression.to_s &&
    @data['query_digest'] == query_digest
end

#complete_part!(index:, checksum: nil) ⇒ Object



118
119
120
121
122
123
124
125
# File 'lib/purplelight/manifest.rb', line 118

def complete_part!(index:, checksum: nil)
  @mutex.synchronize do
    part = @data['parts'][index]
    part['complete'] = true
    part['checksum'] = checksum
    save!
  end
end

#configure!(collection:, format:, compression:, query_digest:, options: {}) ⇒ Object



56
57
58
59
60
61
62
63
# File 'lib/purplelight/manifest.rb', line 56

def configure!(collection:, format:, compression:, query_digest:, options: {})
  @data['collection'] = collection
  @data['format'] = format.to_s
  @data['compression'] = compression.to_s
  @data['query_digest'] = query_digest
  @data['options'] = options
  save!
end

#ensure_partitions!(count) ⇒ Object



72
73
74
75
76
77
78
79
80
81
# File 'lib/purplelight/manifest.rb', line 72

def ensure_partitions!(count)
  @mutex.synchronize do
    if @data['partitions'].empty?
      @data['partitions'] = Array.new(count) do |i|
        { 'index' => i, 'last_id_exclusive' => nil, 'completed' => false }
      end
      save!
    end
  end
end

#mark_partition_complete!(index) ⇒ Object



91
92
93
94
95
96
97
# File 'lib/purplelight/manifest.rb', line 91

def mark_partition_complete!(index)
  @mutex.synchronize do
    part = @data['partitions'][index]
    part['completed'] = true
    save!
  end
end

#open_part!(path) ⇒ Object



99
100
101
102
103
104
105
106
107
# File 'lib/purplelight/manifest.rb', line 99

def open_part!(path)
  @mutex.synchronize do
    idx = @data['parts'].size
    @data['parts'] << { 'index' => idx, 'path' => path, 'bytes' => 0, 'rows' => 0, 'complete' => false,
                        'checksum' => nil }
    save!
    idx
  end
end

#partitionsObject



131
132
133
# File 'lib/purplelight/manifest.rb', line 131

def partitions
  @data['partitions']
end

#partsObject



127
128
129
# File 'lib/purplelight/manifest.rb', line 127

def parts
  @data['parts']
end

#save!Object



48
49
50
51
52
53
54
# File 'lib/purplelight/manifest.rb', line 48

def save!
  dir = File.dirname(path)
  FileUtils.mkdir_p(dir)
  tmp = "#{path}.tmp"
  File.write(tmp, JSON.pretty_generate(@data))
  FileUtils.mv(tmp, path)
end

#update_partition_checkpoint!(index, last_id_exclusive) ⇒ Object



83
84
85
86
87
88
89
# File 'lib/purplelight/manifest.rb', line 83

def update_partition_checkpoint!(index, last_id_exclusive)
  @mutex.synchronize do
    part = @data['partitions'][index]
    part['last_id_exclusive'] = last_id_exclusive
    save!
  end
end