Class: Purplelight::Manifest
- Inherits:
-
Object
- Object
- Purplelight::Manifest
- Defined in:
- lib/purplelight/manifest.rb
Overview
Manifest persists snapshot run metadata and progress to a JSON file.
It records configuration, partition checkpoints, and per-part byte/row counts so interrupted runs can resume safely and completed runs are reproducible. Methods are thread-safe where mutation occurs.
Constant Summary collapse
- DEFAULT_VERSION =
1
Instance Attribute Summary collapse
-
#data ⇒ Object
readonly
Returns the value of attribute data.
-
#path ⇒ Object
readonly
Returns the value of attribute path.
Class Method Summary collapse
Instance Method Summary collapse
- #add_progress_to_part!(index:, rows_delta:, bytes_delta:) ⇒ Object
- #compatible_with?(collection:, format:, compression:, query_digest:) ⇒ Boolean
- #complete_part!(index:, checksum: nil) ⇒ Object
- #configure!(collection:, format:, compression:, query_digest:, options: {}) ⇒ Object
- #ensure_partitions!(count) ⇒ Object
-
#initialize(path:, data: nil) ⇒ Manifest
constructor
A new instance of Manifest.
- #mark_partition_complete!(index) ⇒ Object
- #open_part!(path) ⇒ Object
- #partitions ⇒ Object
- #parts ⇒ Object
- #save! ⇒ Object
- #update_partition_checkpoint!(index, last_id_exclusive) ⇒ Object
Constructor Details
#initialize(path:, data: nil) ⇒ Manifest
Returns a new instance of Manifest.
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
# File 'lib/purplelight/manifest.rb', line 25 def initialize(path:, data: nil) @path = path @data = data || { 'version' => DEFAULT_VERSION, 'run_id' => SecureRandom.uuid, 'created_at' => Time.now.utc.iso8601, 'collection' => nil, 'format' => nil, 'compression' => nil, 'query_digest' => nil, 'options' => {}, 'parts' => [], 'partitions' => [] } @mutex = Mutex.new @last_save_at = Time.now end |
Instance Attribute Details
#data ⇒ Object (readonly)
Returns the value of attribute data.
18 19 20 |
# File 'lib/purplelight/manifest.rb', line 18 def data @data end |
#path ⇒ Object (readonly)
Returns the value of attribute path.
18 19 20 |
# File 'lib/purplelight/manifest.rb', line 18 def path @path end |
Class Method Details
.load(path) ⇒ Object
43 44 45 46 |
# File 'lib/purplelight/manifest.rb', line 43 def self.load(path) data = JSON.parse(File.read(path)) new(path: path, data: data) end |
.query_digest(query, projection) ⇒ Object
20 21 22 23 |
# File 'lib/purplelight/manifest.rb', line 20 def self.query_digest(query, projection) payload = { query: query, projection: projection } Digest::SHA256.hexdigest(JSON.generate(payload)) end |
Instance Method Details
#add_progress_to_part!(index:, rows_delta:, bytes_delta:) ⇒ Object
109 110 111 112 113 114 115 116 |
# File 'lib/purplelight/manifest.rb', line 109 def add_progress_to_part!(index:, rows_delta:, bytes_delta:) @mutex.synchronize do part = @data['parts'][index] part['rows'] += rows_delta part['bytes'] += bytes_delta save_maybe! end end |
#compatible_with?(collection:, format:, compression:, query_digest:) ⇒ Boolean
65 66 67 68 69 70 |
# File 'lib/purplelight/manifest.rb', line 65 def compatible_with?(collection:, format:, compression:, query_digest:) @data['collection'] == collection && @data['format'] == format.to_s && @data['compression'] == compression.to_s && @data['query_digest'] == query_digest end |
#complete_part!(index:, checksum: nil) ⇒ Object
118 119 120 121 122 123 124 125 |
# File 'lib/purplelight/manifest.rb', line 118 def complete_part!(index:, checksum: nil) @mutex.synchronize do part = @data['parts'][index] part['complete'] = true part['checksum'] = checksum save! end end |
#configure!(collection:, format:, compression:, query_digest:, options: {}) ⇒ Object
56 57 58 59 60 61 62 63 |
# File 'lib/purplelight/manifest.rb', line 56 def configure!(collection:, format:, compression:, query_digest:, options: {}) @data['collection'] = collection @data['format'] = format.to_s @data['compression'] = compression.to_s @data['query_digest'] = query_digest @data['options'] = save! end |
#ensure_partitions!(count) ⇒ Object
72 73 74 75 76 77 78 79 80 81 |
# File 'lib/purplelight/manifest.rb', line 72 def ensure_partitions!(count) @mutex.synchronize do if @data['partitions'].empty? @data['partitions'] = Array.new(count) do |i| { 'index' => i, 'last_id_exclusive' => nil, 'completed' => false } end save! end end end |
#mark_partition_complete!(index) ⇒ Object
91 92 93 94 95 96 97 |
# File 'lib/purplelight/manifest.rb', line 91 def mark_partition_complete!(index) @mutex.synchronize do part = @data['partitions'][index] part['completed'] = true save! end end |
#open_part!(path) ⇒ Object
99 100 101 102 103 104 105 106 107 |
# File 'lib/purplelight/manifest.rb', line 99 def open_part!(path) @mutex.synchronize do idx = @data['parts'].size @data['parts'] << { 'index' => idx, 'path' => path, 'bytes' => 0, 'rows' => 0, 'complete' => false, 'checksum' => nil } save! idx end end |
#partitions ⇒ Object
131 132 133 |
# File 'lib/purplelight/manifest.rb', line 131 def partitions @data['partitions'] end |
#parts ⇒ Object
127 128 129 |
# File 'lib/purplelight/manifest.rb', line 127 def parts @data['parts'] end |
#save! ⇒ Object
48 49 50 51 52 53 54 |
# File 'lib/purplelight/manifest.rb', line 48 def save! dir = File.dirname(path) FileUtils.mkdir_p(dir) tmp = "#{path}.tmp" File.write(tmp, JSON.pretty_generate(@data)) FileUtils.mv(tmp, path) end |
#update_partition_checkpoint!(index, last_id_exclusive) ⇒ Object
83 84 85 86 87 88 89 |
# File 'lib/purplelight/manifest.rb', line 83 def update_partition_checkpoint!(index, last_id_exclusive) @mutex.synchronize do part = @data['partitions'][index] part['last_id_exclusive'] = last_id_exclusive save! end end |