Class: OpenC3::BucketUtilities

Inherits:
Object
  • Object
show all
Defined in:
lib/openc3/utilities/bucket_utilities.rb

Constant Summary collapse

FILE_TIMESTAMP_FORMAT =
"%Y%m%d%H%M%S%N"
DIRECTORY_TIMESTAMP_FORMAT =
"%Y%m%d"

Class Method Summary collapse

Class Method Details

.compress_file(filename, chunk_size = 50_000_000) ⇒ Object



86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/openc3/utilities/bucket_utilities.rb', line 86

def self.compress_file(filename, chunk_size = 50_000_000)
  zipped = "#{filename}.gz"

  Zlib::GzipWriter.open(zipped) do |gz|
    gz.mtime = File.mtime(filename)
    gz.orig_name = filename
    File.open(filename, 'rb') do |file|
      while chunk = file.read(chunk_size) do
        gz.write(chunk)
      end
    end
  end

  return zipped
end

.directory_in_time_range(directory, start_time, end_time) ⇒ Object



126
127
128
129
130
131
132
133
134
135
# File 'lib/openc3/utilities/bucket_utilities.rb', line 126

def self.directory_in_time_range(directory, start_time, end_time)
  basename = File.basename(directory)
  directory_start_time = DateTime.strptime(basename, DIRECTORY_TIMESTAMP_FORMAT).to_time
  directory_end_time = directory_start_time + Time::SEC_PER_DAY
  if (not start_time or start_time < directory_end_time) and (not end_time or end_time >= directory_start_time)
    return true
  else
    return false
  end
end

.file_in_time_range(bucket_path, start_time, end_time, overlap:) ⇒ Object



149
150
151
152
153
154
155
156
157
158
159
160
161
# File 'lib/openc3/utilities/bucket_utilities.rb', line 149

def self.file_in_time_range(bucket_path, start_time, end_time, overlap:)
  file_start_time, file_end_time = get_file_times(bucket_path)
  if overlap
    if (not start_time or start_time <= file_end_time) and (not end_time or end_time >= file_start_time)
      return true
    end
  else
    if (not start_time or start_time <= file_start_time) and (not end_time or end_time >= file_end_time)
      return true
    end
  end
  return false
end

.files_between_time(bucket, prefix, start_time, end_time, file_suffix: nil, overlap: false, max_request: 1000, max_total: 100_000) ⇒ Object

Parameters:

  • bucket (String)

    Name of the bucket to list

  • prefix (String)

    Prefix to filter all files by

  • start_time (Time|nil)

    Ruby time to find files after. nil means no start (first file on).

  • end_time (Time|nil)

    Ruby time to find files before. nil means no end (up to last file).

  • overlap (Boolean) (defaults to: false)

    Whether to include files which overlap the start and end time

  • max_request (Integer) (defaults to: 1000)

    How many files to request in each API call

  • max_total (Integer) (defaults to: 100_000)

    Total number of files before stopping API requests



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/openc3/utilities/bucket_utilities.rb', line 39

def self.files_between_time(bucket, prefix, start_time, end_time, file_suffix: nil,
                            overlap: false, max_request: 1000, max_total: 100_000)
  client = Bucket.getClient()
  oldest_list = []

  # Return nothing if bucket doesn't exist (it won't at the very beginning)
  unless client.exist?(bucket)
    return oldest_list
  end

  directories = client.list_directories(bucket: bucket, path: prefix)
  filtered_directories = filter_directories_to_time_range(directories, start_time, end_time)
  filtered_directories.each do |directory|
    directory_files = client.list_objects(bucket: bucket, prefix: "#{prefix}/#{directory}", max_request: max_request, max_total: max_total)
    files = filter_files_to_time_range(directory_files, start_time, end_time, file_suffix: file_suffix, overlap: overlap)
    oldest_list.concat(files)
  end
  return oldest_list
end

.filter_directories_to_time_range(directories, start_time, end_time) ⇒ Object

Private methods



118
119
120
121
122
123
124
# File 'lib/openc3/utilities/bucket_utilities.rb', line 118

def self.filter_directories_to_time_range(directories, start_time, end_time)
  result = []
  directories.each do |directory|
    result << directory if directory_in_time_range(directory, start_time, end_time)
  end
  return result
end

.filter_files_to_time_range(files, start_time, end_time, file_suffix: nil, overlap: false) ⇒ Object



137
138
139
140
141
142
143
144
145
146
147
# File 'lib/openc3/utilities/bucket_utilities.rb', line 137

def self.filter_files_to_time_range(files, start_time, end_time, file_suffix: nil, overlap: false)
  result = []
  files.each do |file|
    file_key = file.key.to_s
    next if file_suffix and not file_key.end_with?(file_suffix)
    if file_in_time_range(file_key, start_time, end_time, overlap: overlap)
      result << file_key
    end
  end
  return result
end

.get_cache_control(filename) ⇒ Object



78
79
80
81
82
83
84
# File 'lib/openc3/utilities/bucket_utilities.rb', line 78

def self.get_cache_control(filename)
  # Allow caching for files that have a filename versioning strategy
  has_version_number = /(-|_|\.)\d+(-|_|\.)\d+(-|_|\.)\d+\./.match(filename)
  has_content_hash = /\.[a-f0-9]{20}\./.match(filename)
  return nil if has_version_number or has_content_hash
  return 'no-store'
end

.get_file_times(bucket_path) ⇒ Object



163
164
165
166
167
168
169
# File 'lib/openc3/utilities/bucket_utilities.rb', line 163

def self.get_file_times(bucket_path)
  basename = File.basename(bucket_path)
  file_start_timestamp, file_end_timestamp, other = basename.split("__")
  file_start_time = DateTime.strptime(file_start_timestamp, FILE_TIMESTAMP_FORMAT).to_time
  file_end_time = DateTime.strptime(file_end_timestamp, FILE_TIMESTAMP_FORMAT).to_time
  return file_start_time, file_end_time
end

.move_log_file_to_bucket(filename, bucket_key, metadata: {}) ⇒ Object



59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/openc3/utilities/bucket_utilities.rb', line 59

def self.move_log_file_to_bucket(filename, bucket_key, metadata: {})
  Thread.new do
    client = Bucket.getClient()

    zipped = compress_file(filename)
    bucket_key = bucket_key + '.gz'
    File.open(zipped, 'rb') do |read_file|
      client.put_object(bucket: ENV['OPENC3_LOGS_BUCKET'], key: bucket_key, body: read_file, metadata: )
    end
    Logger.debug "wrote #{ENV['OPENC3_LOGS_BUCKET']}/#{bucket_key}"
    ReducerModel.add_file(bucket_key) # Record the new file for data reduction

    File.delete(zipped)
    File.delete(filename)
  rescue => err
    Logger.error("Error saving log file to bucket: #{filename}\n#{err.formatted}")
  end
end

.uncompress_file(filename, chunk_size = 50_000_000) ⇒ Object



102
103
104
105
106
107
108
109
110
111
112
113
114
# File 'lib/openc3/utilities/bucket_utilities.rb', line 102

def self.uncompress_file(filename, chunk_size = 50_000_000)
  unzipped = filename[0..-4] # Drop .gz

  Zlib::GzipReader.open(filename) do |gz|
    File.open(unzipped, 'wb') do |file|
      while chunk = gz.read(chunk_size)
        file.write(chunk)
      end
    end
  end

  return unzipped
end