Class: OpenC3::LogMicroservice
- Inherits:
-
Microservice
- Object
- Microservice
- OpenC3::LogMicroservice
- Defined in:
- lib/openc3/microservices/log_microservice.rb
Constant Summary collapse
- DEFAULT_BUFFER_DEPTH =
1 minutes at 1Hz
60
Instance Attribute Summary
Attributes inherited from Microservice
#count, #custom, #error, #logger, #microservice_status_thread, #name, #scope, #secrets, #state
Instance Method Summary collapse
-
#initialize(name) ⇒ LogMicroservice
constructor
A new instance of LogMicroservice.
- #log_data(topic, msg_id, msg_hash, redis) ⇒ Object
- #run ⇒ Object
- #setup_plws ⇒ Object
- #shutdown ⇒ Object
Methods inherited from Microservice
Constructor Details
#initialize(name) ⇒ LogMicroservice
Returns a new instance of LogMicroservice.
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/openc3/microservices/log_microservice.rb', line 32 def initialize(name) super(name) @config['options'].each do |option| case option[0].upcase when 'RAW_OR_DECOM' @raw_or_decom = option[1].intern when 'CMD_OR_TLM' @cmd_or_tlm = option[1].intern when 'CYCLE_TIME' # Maximum time between log files @cycle_time = option[1].to_i when 'CYCLE_SIZE' # Maximum size of a log file @cycle_size = option[1].to_i when 'BUFFER_DEPTH' # Buffer depth to write in time order @buffer_depth = option[1].to_i else @logger.error("Unknown option passed to microservice #{@name}: #{option}") end end raise "Microservice #{@name} not fully configured" unless @raw_or_decom and @cmd_or_tlm # These settings limit the log file to 10 minutes or 50MB of data, whichever comes first @cycle_time = 600 unless @cycle_time # 10 minutes @cycle_size = 50_000_000 unless @cycle_size # ~50 MB @buffer_depth = DEFAULT_BUFFER_DEPTH unless @buffer_depth @error_count = 0 @metric.set(name: 'log_total', value: @count, type: 'counter') @metric.set(name: 'log_error_total', value: @error_count, type: 'counter') end |
Instance Method Details
#log_data(topic, msg_id, msg_hash, redis) ⇒ Object
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 |
# File 'lib/openc3/microservices/log_microservice.rb', line 95 def log_data(topic, msg_id, msg_hash, redis) msgid_seconds_from_epoch = msg_id.split('-')[0].to_i / 1000.0 delta = Time.now.to_f - msgid_seconds_from_epoch @metric.set(name: 'log_topic_delta_seconds', value: delta, type: 'gauge', unit: 'seconds', help: 'Delta time between data written to stream and log start') topic_split = topic.gsub(/{|}/, '').split("__") # Remove the redis hashtag curly braces target_name = topic_split[2] packet_name = topic_split[3] rt_or_stored = ConfigParser.handle_true_false(msg_hash["stored"]) ? :STORED : :RT packet_type = nil data_key = nil if @raw_or_decom == :RAW packet_type = :RAW_PACKET data_key = "buffer" else # :DECOM packet_type = :JSON_PACKET data_key = "json_data" end @plws[target_name][rt_or_stored].buffered_write(packet_type, @cmd_or_tlm, target_name, packet_name, msg_hash["time"].to_i, rt_or_stored == :STORED, msg_hash[data_key], nil, topic, msg_id) rescue => err @error = err @logger.error("#{@name} error: #{err.formatted}") @error_count += 1 @metric.set(name: 'log_error_total', value: @error_count, type: 'counter') end |
#run ⇒ Object
63 64 65 66 67 68 69 70 71 72 73 74 75 |
# File 'lib/openc3/microservices/log_microservice.rb', line 63 def run setup_plws() while true break if @cancel_thread Topic.read_topics(@topics) do |topic, msg_id, msg_hash, redis| break if @cancel_thread log_data(topic, msg_id, msg_hash, redis) @count += 1 @metric.set(name: 'log_total', value: @count, type: 'counter') end end end |
#setup_plws ⇒ Object
77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 |
# File 'lib/openc3/microservices/log_microservice.rb', line 77 def setup_plws @plws = {} @topics.each do |topic| topic_split = topic.gsub(/{|}/, '').split("__") # Remove the redis hashtag curly braces scope = topic_split[0] target_name = topic_split[2] packet_name = topic_split[3] type = @raw_or_decom.to_s.downcase remote_log_directory = "#{scope}/#{type}_logs/#{@cmd_or_tlm.to_s.downcase}/#{target_name}" rt_label = "#{scope}__#{target_name}__ALL__rt__#{type}" stored_label = "#{scope}__#{target_name}__ALL__stored__#{type}" @plws[target_name] ||= { :RT => BufferedPacketLogWriter.new(remote_log_directory, rt_label, true, @cycle_time, @cycle_size, nil, nil, true, @buffer_depth), :STORED => BufferedPacketLogWriter.new(remote_log_directory, stored_label, true, @cycle_time, @cycle_size, nil, nil, true, @buffer_depth) } end end |
#shutdown ⇒ Object
121 122 123 124 125 126 127 128 129 130 131 132 133 134 |
# File 'lib/openc3/microservices/log_microservice.rb', line 121 def shutdown # Make sure all the existing logs are properly closed down threads = [] @plws.each do |target_name, plw_hash| plw_hash.each do |type, plw| threads.concat(plw.shutdown) end end # Wait for all the logging threads to move files to buckets threads.flatten.compact.each do |thread| thread.join end super() end |