Class: OpenC3::LogMicroservice
- Inherits:
-
Microservice
- Object
- Microservice
- OpenC3::LogMicroservice
- Defined in:
- lib/openc3/microservices/log_microservice.rb
Constant Summary collapse
- DEFAULT_BUFFER_DEPTH =
1 minutes at 1Hz
60
Instance Attribute Summary
Attributes inherited from Microservice
#count, #custom, #error, #logger, #microservice_status_thread, #name, #scope, #secrets, #state
Instance Method Summary collapse
-
#initialize(name) ⇒ LogMicroservice
constructor
A new instance of LogMicroservice.
- #log_data(topic, msg_id, msg_hash, redis) ⇒ Object
- #run ⇒ Object
- #setup_plws ⇒ Object
- #shutdown ⇒ Object
Methods inherited from Microservice
Constructor Details
#initialize(name) ⇒ LogMicroservice
Returns a new instance of LogMicroservice.
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
# File 'lib/openc3/microservices/log_microservice.rb', line 32 def initialize(name) super(name) @config['options'].each do |option| case option[0].upcase when 'RAW_OR_DECOM' @raw_or_decom = option[1].intern when 'CMD_OR_TLM' @cmd_or_tlm = option[1].intern when 'CYCLE_TIME' # Maximum time between log files @cycle_time = option[1].to_i when 'CYCLE_SIZE' # Maximum size of a log file @cycle_size = option[1].to_i when 'BUFFER_DEPTH' # Buffer depth to write in time order @buffer_depth = option[1].to_i else @logger.error("Unknown option passed to microservice #{@name}: #{option}") end end raise "Microservice #{@name} not fully configured" unless @raw_or_decom and @cmd_or_tlm # These settings limit the log file to 10 minutes or 50MB of data, whichever comes first @cycle_time = 600 unless @cycle_time # 10 minutes @cycle_size = 50_000_000 unless @cycle_size # ~50 MB @buffer_depth = DEFAULT_BUFFER_DEPTH unless @buffer_depth end |
Instance Method Details
#log_data(topic, msg_id, msg_hash, redis) ⇒ Object
90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 |
# File 'lib/openc3/microservices/log_microservice.rb', line 90 def log_data(topic, msg_id, msg_hash, redis) start = Process.clock_gettime(Process::CLOCK_MONOTONIC) topic_split = topic.gsub(/{|}/, '').split("__") # Remove the redis hashtag curly braces target_name = topic_split[2] packet_name = topic_split[3] rt_or_stored = ConfigParser.handle_true_false(msg_hash["stored"]) ? :STORED : :RT packet_type = nil data_key = nil if @raw_or_decom == :RAW packet_type = :RAW_PACKET data_key = "buffer" else # :DECOM packet_type = :JSON_PACKET data_key = "json_data" end @plws[target_name][rt_or_stored].buffered_write(packet_type, @cmd_or_tlm, target_name, packet_name, msg_hash["time"].to_i, rt_or_stored == :STORED, msg_hash[data_key], nil, topic, msg_id) @count += 1 diff = Process.clock_gettime(Process::CLOCK_MONOTONIC) - start # seconds as a float metric_labels = { "packet" => packet_name, "target" => target_name, "raw_or_decom" => @raw_or_decom.to_s, "cmd_or_tlm" => @cmd_or_tlm.to_s } @metric.add_sample(name: "log_duration_seconds", value: diff, labels: metric_labels) rescue => err @error = err @logger.error("#{@name} error: #{err.formatted}") end |
#run ⇒ Object
60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/openc3/microservices/log_microservice.rb', line 60 def run setup_plws() while true break if @cancel_thread Topic.read_topics(@topics) do |topic, msg_id, msg_hash, redis| break if @cancel_thread log_data(topic, msg_id, msg_hash, redis) end end end |
#setup_plws ⇒ Object
72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 |
# File 'lib/openc3/microservices/log_microservice.rb', line 72 def setup_plws @plws = {} @topics.each do |topic| topic_split = topic.gsub(/{|}/, '').split("__") # Remove the redis hashtag curly braces scope = topic_split[0] target_name = topic_split[2] packet_name = topic_split[3] type = @raw_or_decom.to_s.downcase remote_log_directory = "#{scope}/#{type}_logs/#{@cmd_or_tlm.to_s.downcase}/#{target_name}" rt_label = "#{scope}__#{target_name}__ALL__rt__#{type}" stored_label = "#{scope}__#{target_name}__ALL__stored__#{type}" @plws[target_name] ||= { :RT => BufferedPacketLogWriter.new(remote_log_directory, rt_label, true, @cycle_time, @cycle_size, nil, nil, true, @buffer_depth), :STORED => BufferedPacketLogWriter.new(remote_log_directory, stored_label, true, @cycle_time, @cycle_size, nil, nil, true, @buffer_depth) } end end |
#shutdown ⇒ Object
115 116 117 118 119 120 121 122 123 124 125 126 127 128 |
# File 'lib/openc3/microservices/log_microservice.rb', line 115 def shutdown # Make sure all the existing logs are properly closed down threads = [] @plws.each do |target_name, plw_hash| plw_hash.each do |type, plw| threads.concat(plw.shutdown) end end # Wait for all the logging threads to move files to buckets threads.flatten.compact.each do |thread| thread.join end super() end |