Class: BucketFileCache

Inherits:
Object show all
Defined in:
lib/openc3/utilities/bucket_file_cache.rb

Constant Summary collapse

MAX_DISK_USAGE =

Default 20 GB

(ENV['OPENC3_BUCKET_FILE_CACHE_SIZE'] || 20_000_000_000).to_i
CHECK_TIME_SECONDS =
3600
@@instance =
nil
@@mutex =
Mutex.new

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeBucketFileCache

Returns a new instance of BucketFileCache.



148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
# File 'lib/openc3/utilities/bucket_file_cache.rb', line 148

def initialize
  # Create local file cache location
  @cache_dir = Dir.mktmpdir
  FileUtils.mkdir_p(@cache_dir)
  at_exit do
    FileUtils.remove_dir(@cache_dir, true)
  end

  @current_disk_usage = 0
  @queued_bucket_files = []
  @bucket_file_hash = {}

  @thread = Thread.new do
    client = OpenC3::Bucket.getClient()
    while true
      check_time = Time.now + CHECK_TIME_SECONDS # Check for aged out files periodically
      if @queued_bucket_files.length > 0 and @current_disk_usage < MAX_DISK_USAGE
        @@mutex.synchronize do
          bucket_file = @queued_bucket_files.shift
        end
        begin
          retrieved = bucket_file.retrieve(client)
          @@mutex.synchronize do
            @current_disk_usage += bucket_file.size if retrieved
          end
        rescue
          # Might have been deleted
        end
        sleep(0.01) # Small throttle
      else
        # Nothing to do or disk full
        if Time.now > check_time
          check_time = Time.now + CHECK_TIME_SECONDS
          # Delete any files that aren't reserved and are old
          removed_files = []
          @bucket_file_hash.each do |bucket_path, bucket_file|
            deleted = bucket_file.age_check
            if deleted
              removed_files << bucket_path
              @current_disk_usage -= bucket_file.size
            end
          end
          removed_files.each do |bucket_path|
            @bucket_file_hash.delete(bucket_path)
          end
        end
        sleep(1)
      end
    end
  rescue => err
    OpenC3::Logger.error "BucketFileCache thread unexpectedly died\n#{err.formatted}"
  end
end

Instance Attribute Details

#cache_dirObject (readonly)

Returns the value of attribute cache_dir.



135
136
137
# File 'lib/openc3/utilities/bucket_file_cache.rb', line 135

def cache_dir
  @cache_dir
end

Class Method Details

.hint(bucket_paths) ⇒ Object



202
203
204
# File 'lib/openc3/utilities/bucket_file_cache.rb', line 202

def self.hint(bucket_paths)
  return instance().hint(bucket_paths)
end

.instanceObject



140
141
142
143
144
145
146
# File 'lib/openc3/utilities/bucket_file_cache.rb', line 140

def self.instance
  return @@instance if @@instance
  @@mutex.synchronize do
    @@instance ||= BucketFileCache.new
  end
  @@instance
end

.reserve(bucket_path) ⇒ Object



206
207
208
# File 'lib/openc3/utilities/bucket_file_cache.rb', line 206

def self.reserve(bucket_path)
  return instance().reserve(bucket_path)
end

.unreserve(bucket_path) ⇒ Object



210
211
212
# File 'lib/openc3/utilities/bucket_file_cache.rb', line 210

def self.unreserve(bucket_path)
  return instance().unreserve(bucket_path)
end

Instance Method Details

#create_bucket_file(bucket_path) ⇒ Object

Private



249
250
251
252
253
254
255
256
257
# File 'lib/openc3/utilities/bucket_file_cache.rb', line 249

def create_bucket_file(bucket_path)
  bucket_file = @bucket_file_hash[bucket_path]
  unless bucket_file
    bucket_file = BucketFile.new(bucket_path)
    @queued_bucket_files << bucket_file
    @bucket_file_hash[bucket_path] = bucket_file
  end
  return bucket_file
end

#hint(bucket_paths) ⇒ Object



214
215
216
217
218
219
220
221
222
# File 'lib/openc3/utilities/bucket_file_cache.rb', line 214

def hint(bucket_paths)
  @@mutex.synchronize do
    bucket_paths.each_with_index do |bucket_path, index|
      bucket_file = create_bucket_file(bucket_path)
      bucket_file.priority = index
    end
    @queued_bucket_files.sort! {|file1, file2| file1.priority <=> file2.priority}
  end
end

#reserve(bucket_path) ⇒ Object



224
225
226
227
228
229
230
231
232
# File 'lib/openc3/utilities/bucket_file_cache.rb', line 224

def reserve(bucket_path)
  @@mutex.synchronize do
    bucket_file = create_bucket_file(bucket_path)
    retrieved = bucket_file.reserve
    @current_disk_usage += bucket_file.size if retrieved
    @queued_bucket_files.delete(bucket_file)
    return bucket_file
  end
end

#unreserve(bucket_path) ⇒ Object



234
235
236
237
238
239
240
241
242
243
244
245
# File 'lib/openc3/utilities/bucket_file_cache.rb', line 234

def unreserve(bucket_path)
  @@mutex.synchronize do
    bucket_file = @bucket_file_hash[bucket_path]
    if bucket_file
      bucket_file.unreserve
      if bucket_file.reservation_count <= 0 and !@queued_bucket_files.include?(bucket_file)
        @current_disk_usage -= bucket_file.size
        @bucket_file_hash.delete(bucket_file)
      end
    end
  end
end