Class: BucketFileCache

Inherits:
Object show all
Defined in:
lib/openc3/utilities/bucket_file_cache.rb

Constant Summary collapse

MAX_DISK_USAGE =

Default 20 GB

(ENV['OPENC3_BUCKET_FILE_CACHE_SIZE'] || 20_000_000_000).to_i
CHECK_TIME_SECONDS =
3600
@@instance =
nil
@@mutex =
Mutex.new

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeBucketFileCache

Returns a new instance of BucketFileCache.



148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
# File 'lib/openc3/utilities/bucket_file_cache.rb', line 148

def initialize
  # Create local file cache location
  @cache_dir = Dir.mktmpdir
  FileUtils.mkdir_p(@cache_dir)
  at_exit do
    FileUtils.remove_dir(@cache_dir, true)
  end

  @current_disk_usage = 0
  @queued_bucket_files = []
  @bucket_file_hash = {}
  bucket_file = nil

  @thread = Thread.new do
    client = OpenC3::Bucket.getClient()
    while true
      check_time = Time.now + CHECK_TIME_SECONDS # Check for aged out files periodically
      if @queued_bucket_files.length > 0 and @current_disk_usage < MAX_DISK_USAGE
        @@mutex.synchronize do
          bucket_file = @queued_bucket_files.shift
        end
        begin
          retrieved = bucket_file.retrieve(client)
          @@mutex.synchronize do
            @current_disk_usage += bucket_file.size if retrieved
          end
        rescue
          # Might have been deleted
        end
        sleep(0.01) # Small throttle
      else
        # Nothing to do or disk full
        if Time.now > check_time
          check_time = Time.now + CHECK_TIME_SECONDS
          # Delete any files that aren't reserved and are old
          removed_files = []
          @bucket_file_hash.each do |bucket_path, bucket_file|
            deleted = bucket_file.age_check
            if deleted
              removed_files << bucket_path
              @current_disk_usage -= bucket_file.size
            end
          end
          removed_files.each do |bucket_path|
            @bucket_file_hash.delete(bucket_path)
          end
        end
        sleep(1)
      end
    end
  rescue => err
    OpenC3::Logger.error "BucketFileCache thread unexpectedly died\n#{err.formatted}"
  end
end

Instance Attribute Details

#cache_dirObject (readonly)

Returns the value of attribute cache_dir.



135
136
137
# File 'lib/openc3/utilities/bucket_file_cache.rb', line 135

def cache_dir
  @cache_dir
end

Class Method Details

.hint(bucket_paths) ⇒ Object



203
204
205
# File 'lib/openc3/utilities/bucket_file_cache.rb', line 203

def self.hint(bucket_paths)
  return instance().hint(bucket_paths)
end

.instanceObject



140
141
142
143
144
145
146
# File 'lib/openc3/utilities/bucket_file_cache.rb', line 140

def self.instance
  return @@instance if @@instance
  @@mutex.synchronize do
    @@instance ||= BucketFileCache.new
  end
  @@instance
end

.reserve(bucket_path) ⇒ Object



207
208
209
# File 'lib/openc3/utilities/bucket_file_cache.rb', line 207

def self.reserve(bucket_path)
  return instance().reserve(bucket_path)
end

.unreserve(bucket_path) ⇒ Object



211
212
213
# File 'lib/openc3/utilities/bucket_file_cache.rb', line 211

def self.unreserve(bucket_path)
  return instance().unreserve(bucket_path)
end

Instance Method Details

#create_bucket_file(bucket_path) ⇒ Object

Private



250
251
252
253
254
255
256
257
258
# File 'lib/openc3/utilities/bucket_file_cache.rb', line 250

def create_bucket_file(bucket_path)
  bucket_file = @bucket_file_hash[bucket_path]
  unless bucket_file
    bucket_file = BucketFile.new(bucket_path)
    @queued_bucket_files << bucket_file
    @bucket_file_hash[bucket_path] = bucket_file
  end
  return bucket_file
end

#hint(bucket_paths) ⇒ Object



215
216
217
218
219
220
221
222
223
# File 'lib/openc3/utilities/bucket_file_cache.rb', line 215

def hint(bucket_paths)
  @@mutex.synchronize do
    bucket_paths.each_with_index do |bucket_path, index|
      bucket_file = create_bucket_file(bucket_path)
      bucket_file.priority = index
    end
    @queued_bucket_files.sort! {|file1, file2| file1.priority <=> file2.priority}
  end
end

#reserve(bucket_path) ⇒ Object



225
226
227
228
229
230
231
232
233
# File 'lib/openc3/utilities/bucket_file_cache.rb', line 225

def reserve(bucket_path)
  @@mutex.synchronize do
    bucket_file = create_bucket_file(bucket_path)
    retrieved = bucket_file.reserve
    @current_disk_usage += bucket_file.size if retrieved
    @queued_bucket_files.delete(bucket_file)
    return bucket_file
  end
end

#unreserve(bucket_path) ⇒ Object



235
236
237
238
239
240
241
242
243
244
245
246
# File 'lib/openc3/utilities/bucket_file_cache.rb', line 235

def unreserve(bucket_path)
  @@mutex.synchronize do
    bucket_file = @bucket_file_hash[bucket_path]
    if bucket_file
      bucket_file.unreserve
      if bucket_file.reservation_count <= 0 and !@queued_bucket_files.include?(bucket_file)
        @current_disk_usage -= bucket_file.size
        @bucket_file_hash.delete(bucket_file)
      end
    end
  end
end