Class: OpenC3::BucketUtilities
- Defined in:
- lib/openc3/utilities/bucket_utilities.rb
Constant Summary collapse
- FILE_TIMESTAMP_FORMAT =
"%Y%m%d%H%M%S%N"
- DIRECTORY_TIMESTAMP_FORMAT =
"%Y%m%d"
Class Method Summary collapse
- .compress_file(filename, chunk_size = 50_000_000) ⇒ Object
- .directory_in_time_range(directory, start_time, end_time) ⇒ Object
- .file_in_time_range(bucket_path, start_time, end_time, overlap:) ⇒ Object
- .files_between_time(bucket, prefix, start_time, end_time, file_suffix: nil, overlap: false, max_request: 1000, max_total: 100_000) ⇒ Object
-
.filter_directories_to_time_range(directories, start_time, end_time) ⇒ Object
Private methods.
- .filter_files_to_time_range(files, start_time, end_time, file_suffix: nil, overlap: false) ⇒ Object
- .get_cache_control(filename) ⇒ Object
- .get_file_times(bucket_path) ⇒ Object
- .move_log_file_to_bucket(filename, bucket_key, metadata: {}) ⇒ Object
- .uncompress_file(filename, chunk_size = 50_000_000) ⇒ Object
Class Method Details
.compress_file(filename, chunk_size = 50_000_000) ⇒ Object
86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 |
# File 'lib/openc3/utilities/bucket_utilities.rb', line 86 def self.compress_file(filename, chunk_size = 50_000_000) zipped = "#{filename}.gz" Zlib::GzipWriter.open(zipped) do |gz| gz.mtime = File.mtime(filename) gz.orig_name = filename File.open(filename, 'rb') do |file| while chunk = file.read(chunk_size) do gz.write(chunk) end end end return zipped end |
.directory_in_time_range(directory, start_time, end_time) ⇒ Object
126 127 128 129 130 131 132 133 134 135 |
# File 'lib/openc3/utilities/bucket_utilities.rb', line 126 def self.directory_in_time_range(directory, start_time, end_time) basename = File.basename(directory) directory_start_time = DateTime.strptime(basename, DIRECTORY_TIMESTAMP_FORMAT).to_time directory_end_time = directory_start_time + Time::SEC_PER_DAY if (not start_time or start_time < directory_end_time) and (not end_time or end_time >= directory_start_time) return true else return false end end |
.file_in_time_range(bucket_path, start_time, end_time, overlap:) ⇒ Object
149 150 151 152 153 154 155 156 157 158 159 160 161 |
# File 'lib/openc3/utilities/bucket_utilities.rb', line 149 def self.file_in_time_range(bucket_path, start_time, end_time, overlap:) file_start_time, file_end_time = get_file_times(bucket_path) if overlap if (not start_time or start_time <= file_end_time) and (not end_time or end_time >= file_start_time) return true end else if (not start_time or start_time <= file_start_time) and (not end_time or end_time >= file_end_time) return true end end return false end |
.files_between_time(bucket, prefix, start_time, end_time, file_suffix: nil, overlap: false, max_request: 1000, max_total: 100_000) ⇒ Object
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
# File 'lib/openc3/utilities/bucket_utilities.rb', line 39 def self.files_between_time(bucket, prefix, start_time, end_time, file_suffix: nil, overlap: false, max_request: 1000, max_total: 100_000) client = Bucket.getClient() oldest_list = [] # Return nothing if bucket doesn't exist (it won't at the very beginning) unless client.exist?(bucket) return oldest_list end directories = client.list_directories(bucket: bucket, path: prefix) filtered_directories = filter_directories_to_time_range(directories, start_time, end_time) filtered_directories.each do |directory| directory_files = client.list_objects(bucket: bucket, prefix: "#{prefix}/#{directory}", max_request: max_request, max_total: max_total) files = filter_files_to_time_range(directory_files, start_time, end_time, file_suffix: file_suffix, overlap: overlap) oldest_list.concat(files) end return oldest_list end |
.filter_directories_to_time_range(directories, start_time, end_time) ⇒ Object
Private methods
118 119 120 121 122 123 124 |
# File 'lib/openc3/utilities/bucket_utilities.rb', line 118 def self.filter_directories_to_time_range(directories, start_time, end_time) result = [] directories.each do |directory| result << directory if directory_in_time_range(directory, start_time, end_time) end return result end |
.filter_files_to_time_range(files, start_time, end_time, file_suffix: nil, overlap: false) ⇒ Object
137 138 139 140 141 142 143 144 145 146 147 |
# File 'lib/openc3/utilities/bucket_utilities.rb', line 137 def self.filter_files_to_time_range(files, start_time, end_time, file_suffix: nil, overlap: false) result = [] files.each do |file| file_key = file.key.to_s next if file_suffix and not file_key.end_with?(file_suffix) if file_in_time_range(file_key, start_time, end_time, overlap: overlap) result << file_key end end return result end |
.get_cache_control(filename) ⇒ Object
78 79 80 81 82 83 84 |
# File 'lib/openc3/utilities/bucket_utilities.rb', line 78 def self.get_cache_control(filename) # Allow caching for files that have a filename versioning strategy has_version_number = /(-|_|\.)\d+(-|_|\.)\d+(-|_|\.)\d+\./.match(filename) has_content_hash = /\.[a-f0-9]{20}\./.match(filename) return nil if has_version_number or has_content_hash return 'no-store' end |
.get_file_times(bucket_path) ⇒ Object
163 164 165 166 167 168 169 |
# File 'lib/openc3/utilities/bucket_utilities.rb', line 163 def self.get_file_times(bucket_path) basename = File.basename(bucket_path) , , other = basename.split("__") file_start_time = DateTime.strptime(, FILE_TIMESTAMP_FORMAT).to_time file_end_time = DateTime.strptime(, FILE_TIMESTAMP_FORMAT).to_time return file_start_time, file_end_time end |
.move_log_file_to_bucket(filename, bucket_key, metadata: {}) ⇒ Object
59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/openc3/utilities/bucket_utilities.rb', line 59 def self.move_log_file_to_bucket(filename, bucket_key, metadata: {}) Thread.new do client = Bucket.getClient() zipped = compress_file(filename) bucket_key = bucket_key + '.gz' File.open(zipped, 'rb') do |read_file| client.put_object(bucket: ENV['OPENC3_LOGS_BUCKET'], key: bucket_key, body: read_file, metadata: ) end Logger.debug "wrote #{ENV['OPENC3_LOGS_BUCKET']}/#{bucket_key}" ReducerModel.add_file(bucket_key) # Record the new file for data reduction File.delete(zipped) File.delete(filename) rescue => err Logger.error("Error saving log file to bucket: #{filename}\n#{err.formatted}") end end |
.uncompress_file(filename, chunk_size = 50_000_000) ⇒ Object
102 103 104 105 106 107 108 109 110 111 112 113 114 |
# File 'lib/openc3/utilities/bucket_utilities.rb', line 102 def self.uncompress_file(filename, chunk_size = 50_000_000) unzipped = filename[0..-4] # Drop .gz Zlib::GzipReader.open(filename) do |gz| File.open(unzipped, 'wb') do |file| while chunk = gz.read(chunk_size) file.write(chunk) end end end return unzipped end |