Class: BucketFile
Constant Summary collapse
- MAX_AGE_SECONDS =
4 hours
3600 * 4
Instance Attribute Summary collapse
-
#bucket_path ⇒ Object
readonly
Returns the value of attribute bucket_path.
-
#error ⇒ Object
readonly
Returns the value of attribute error.
-
#init_time ⇒ Object
readonly
Returns the value of attribute init_time.
-
#local_path ⇒ Object
readonly
Returns the value of attribute local_path.
-
#priority ⇒ Object
Returns the value of attribute priority.
-
#reservation_count ⇒ Object
readonly
Returns the value of attribute reservation_count.
-
#size ⇒ Object
readonly
Returns the value of attribute size.
-
#topic_prefix ⇒ Object
readonly
Returns the value of attribute topic_prefix.
Instance Method Summary collapse
- #age_check ⇒ Object
-
#delete ⇒ Object
private.
-
#initialize(bucket_path) ⇒ BucketFile
constructor
A new instance of BucketFile.
- #reserve ⇒ Object
- #retrieve(client = @bucket, uncompress = true) ⇒ Object
- #unreserve ⇒ Object
Constructor Details
#initialize(bucket_path) ⇒ BucketFile
Returns a new instance of BucketFile.
41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 |
# File 'lib/openc3/utilities/bucket_file_cache.rb', line 41 def initialize(bucket_path) @bucket = OpenC3::Bucket.getClient() @bucket_path = bucket_path @local_path = nil @reservation_count = 0 @size = 0 @priority = 0 @error = nil @init_time = Time.now @mutex = Mutex.new path_split = @bucket_path.split("/") scope = path_split[0].to_s.upcase stream_mode = path_split[1].to_s.split("_")[0].to_s.upcase if stream_mode == 'REDUCED' stream_mode << '_' << path_split[1].to_s.split("_")[1].to_s.upcase end cmd_or_tlm = path_split[2].to_s.upcase target_name = path_split[3].to_s.upcase if stream_mode == 'RAW' type = (cmd_or_tlm == 'CMD') ? 'COMMAND' : 'TELEMETRY' else if stream_mode == 'DECOM' type = (cmd_or_tlm == 'CMD') ? 'DECOMCMD' : 'DECOM' else type = stream_mode # REDUCED_MINUTE, REDUCED_HOUR, or REDUCED_DAY end end @topic_prefix = "#{scope}__#{type}__{#{target_name}}" end |
Instance Attribute Details
#bucket_path ⇒ Object (readonly)
Returns the value of attribute bucket_path.
32 33 34 |
# File 'lib/openc3/utilities/bucket_file_cache.rb', line 32 def bucket_path @bucket_path end |
#error ⇒ Object (readonly)
Returns the value of attribute error.
36 37 38 |
# File 'lib/openc3/utilities/bucket_file_cache.rb', line 36 def error @error end |
#init_time ⇒ Object (readonly)
Returns the value of attribute init_time.
38 39 40 |
# File 'lib/openc3/utilities/bucket_file_cache.rb', line 38 def init_time @init_time end |
#local_path ⇒ Object (readonly)
Returns the value of attribute local_path.
33 34 35 |
# File 'lib/openc3/utilities/bucket_file_cache.rb', line 33 def local_path @local_path end |
#priority ⇒ Object
Returns the value of attribute priority.
39 40 41 |
# File 'lib/openc3/utilities/bucket_file_cache.rb', line 39 def priority @priority end |
#reservation_count ⇒ Object (readonly)
Returns the value of attribute reservation_count.
34 35 36 |
# File 'lib/openc3/utilities/bucket_file_cache.rb', line 34 def reservation_count @reservation_count end |
#size ⇒ Object (readonly)
Returns the value of attribute size.
35 36 37 |
# File 'lib/openc3/utilities/bucket_file_cache.rb', line 35 def size @size end |
#topic_prefix ⇒ Object (readonly)
Returns the value of attribute topic_prefix.
37 38 39 |
# File 'lib/openc3/utilities/bucket_file_cache.rb', line 37 def topic_prefix @topic_prefix end |
Instance Method Details
#age_check ⇒ Object
110 111 112 113 114 115 116 117 118 119 |
# File 'lib/openc3/utilities/bucket_file_cache.rb', line 110 def age_check @mutex.synchronize do if (Time.now - @init_time) > MAX_AGE_SECONDS and @reservation_count <= 0 delete() return true else return false end end end |
#delete ⇒ Object
private
123 124 125 126 127 128 |
# File 'lib/openc3/utilities/bucket_file_cache.rb', line 123 def delete if @local_path and File.exist?(@local_path) File.delete(@local_path) @local_path = nil end end |
#reserve ⇒ Object
97 98 99 100 |
# File 'lib/openc3/utilities/bucket_file_cache.rb', line 97 def reserve @mutex.synchronize { @reservation_count += 1 } return retrieve() end |
#retrieve(client = @bucket, uncompress = true) ⇒ Object
71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 |
# File 'lib/openc3/utilities/bucket_file_cache.rb', line 71 def retrieve(client = @bucket, uncompress = true) @mutex.synchronize do local_path = "#{BucketFileCache.instance.cache_dir}/#{File.basename(@bucket_path)}" unless File.exist?(local_path) OpenC3::Logger.debug "Retrieving #{@bucket_path} from logs bucket" client.get_object(bucket: ENV['OPENC3_LOGS_BUCKET'], key: @bucket_path, path: local_path) if File.exist?(local_path) basename = File.basename(local_path) if uncompress and File.extname(basename) == ".gz" uncompressed = OpenC3::BucketUtilities.uncompress_file(local_path) File.delete(local_path) local_path = uncompressed end @size = File.size(local_path) @local_path = local_path return true end end return false end rescue => err @error = err OpenC3::Logger.error "Failed to retrieve #{@bucket_path}\n#{err.formatted}" raise err end |
#unreserve ⇒ Object
102 103 104 105 106 107 108 |
# File 'lib/openc3/utilities/bucket_file_cache.rb', line 102 def unreserve @mutex.synchronize do @reservation_count -= 1 delete() if @reservation_count <= 0 return @reservation_count end end |