Class: OpenC3::LogWriter

Inherits:
Object show all
Defined in:
lib/openc3/logs/log_writer.rb

Overview

Creates a log. Can automatically cycle the log based on an elasped time period or when the log file reaches a predefined size.

Direct Known Subclasses

PacketLogWriter, TextLogWriter

Constant Summary collapse

CYCLE_TIME_INTERVAL =

The cycle time interval. Cycle times are only checked at this level of granularity.

10
CLEANUP_DELAY =

Delay in seconds before trimming Redis streams

60
@@mutex =

Mutex protecting class variables

Mutex.new
@@instances =

Array of instances used to keep track of cycling logs

[]
@@cycle_thread =

Thread used to cycle logs across all log writers

nil
@@cycle_sleeper =

Sleeper used to delay cycle thread

nil

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(remote_log_directory, logging_enabled = true, cycle_time = nil, cycle_size = 1000000000, cycle_hour = nil, cycle_minute = nil, enforce_time_order = true) ⇒ LogWriter

Returns a new instance of LogWriter.

Parameters:

  • remote_log_directory (String)

    The path to store the log files

  • logging_enabled (Boolean) (defaults to: true)

    Whether to start with logging enabled

  • cycle_time (Integer) (defaults to: nil)

    The amount of time in seconds before creating a new log file. This can be combined with cycle_size but is better used independently.

  • cycle_size (Integer) (defaults to: 1000000000)

    The size in bytes before creating a new log file. This can be combined with cycle_time but is better used independently.

  • cycle_hour (Integer) (defaults to: nil)

    The time at which to cycle the log. Combined with cycle_minute to cycle the log daily at the specified time. If nil, the log will be cycled hourly at the specified cycle_minute.

  • cycle_minute (Integer) (defaults to: nil)

    The time at which to cycle the log. See cycle_hour for more information.



96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
# File 'lib/openc3/logs/log_writer.rb', line 96

def initialize(
  remote_log_directory,
  logging_enabled = true,
  cycle_time = nil,
  cycle_size = 1000000000,
  cycle_hour = nil,
  cycle_minute = nil,
  enforce_time_order = true
)
  @remote_log_directory = remote_log_directory
  @logging_enabled = ConfigParser.handle_true_false(logging_enabled)
  @cycle_time = ConfigParser.handle_nil(cycle_time)
  if @cycle_time
    @cycle_time = Integer(@cycle_time)
    raise "cycle_time must be >= #{CYCLE_TIME_INTERVAL}" if @cycle_time < CYCLE_TIME_INTERVAL
  end
  @cycle_size = ConfigParser.handle_nil(cycle_size)
  @cycle_size = Integer(@cycle_size) if @cycle_size
  @cycle_hour = ConfigParser.handle_nil(cycle_hour)
  @cycle_hour = Integer(@cycle_hour) if @cycle_hour
  @cycle_minute = ConfigParser.handle_nil(cycle_minute)
  @cycle_minute = Integer(@cycle_minute) if @cycle_minute
  @enforce_time_order = ConfigParser.handle_true_false(enforce_time_order)
  @out_of_order = false
  @mutex = Mutex.new
  @file = nil
  @file_size = 0
  @filename = nil
  @start_time = Time.now.utc
  @first_time = nil
  @last_time = nil
  @cancel_threads = false
  @last_offsets = {}
  @cleanup_offsets = []
  @cleanup_times = []
  @previous_time_nsec_since_epoch = nil

  # This is an optimization to avoid creating a new entry object
  # each time we create an entry which we do a LOT!
  @entry = String.new

  # Always make sure there is a cycle thread - (because it does trimming)
  @@mutex.synchronize do
    @@instances << self

    unless @@cycle_thread
      @@cycle_thread = OpenC3.safe_thread("Log cycle") do
        cycle_thread_body()
      end
    end
  end
end

Instance Attribute Details

#cleanup_offsetsObject

Redis offsets for each topic to cleanup



59
60
61
# File 'lib/openc3/logs/log_writer.rb', line 59

def cleanup_offsets
  @cleanup_offsets
end

#cleanup_timesObject

Time at which to cleanup



62
63
64
# File 'lib/openc3/logs/log_writer.rb', line 62

def cleanup_times
  @cleanup_times
end

#cycle_hourObject (readonly)

Returns the value of attribute cycle_hour.



46
47
48
# File 'lib/openc3/logs/log_writer.rb', line 46

def cycle_hour
  @cycle_hour
end

#cycle_minuteObject (readonly)

Returns the value of attribute cycle_minute.



50
51
52
# File 'lib/openc3/logs/log_writer.rb', line 50

def cycle_minute
  @cycle_minute
end

#cycle_timeObject (readonly)

Returns the value of attribute cycle_time.



41
42
43
# File 'lib/openc3/logs/log_writer.rb', line 41

def cycle_time
  @cycle_time
end

#filenameString (readonly)

Returns The filename of the packet log.

Returns:

  • (String)

    The filename of the packet log



33
34
35
# File 'lib/openc3/logs/log_writer.rb', line 33

def filename
  @filename
end

#logging_enabledtrue/false (readonly)

Returns Whether logging is enabled.

Returns:

  • (true/false)

    Whether logging is enabled



36
37
38
# File 'lib/openc3/logs/log_writer.rb', line 36

def logging_enabled
  @logging_enabled
end

#mutexMutex (readonly)

Returns Instance mutex protecting file.

Returns:

  • (Mutex)

    Instance mutex protecting file



56
57
58
# File 'lib/openc3/logs/log_writer.rb', line 56

def mutex
  @mutex
end

#start_timeTime (readonly)

Returns Time that the current log file started.

Returns:

  • (Time)

    Time that the current log file started



53
54
55
# File 'lib/openc3/logs/log_writer.rb', line 53

def start_time
  @start_time
end

Instance Method Details

#bucket_filenameObject



339
340
341
# File 'lib/openc3/logs/log_writer.rb', line 339

def bucket_filename
  "#{first_timestamp}__#{last_timestamp}" + extension
end

#close_file(take_mutex = true) ⇒ Object

Closing a log file isn’t critical so we just log an error. NOTE: This also trims the Redis stream to keep a full file’s worth of data in the stream. This is what prevents continuous stream growth. Returns thread that moves log to bucket



307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
# File 'lib/openc3/logs/log_writer.rb', line 307

def close_file(take_mutex = true)
  threads = []
  @mutex.lock if take_mutex
  begin
    if @file
      begin
        @file.close unless @file.closed?
        Logger.debug "Log File Closed : #{@filename}"
        date = first_timestamp[0..7] # YYYYMMDD
        bucket_key = File.join(@remote_log_directory, date, bucket_filename())
        threads << BucketUtilities.move_log_file_to_bucket(@filename, bucket_key)
        # Now that the file is in storage, trim the Redis stream after a delay
        @cleanup_offsets << {}
        @last_offsets.each do |redis_topic, last_offset|
          @cleanup_offsets[-1][redis_topic] = last_offset
        end
        @cleanup_times << Time.now + CLEANUP_DELAY
        @last_offsets.clear
      rescue Exception => err
        Logger.instance.error "Error closing #{@filename} : #{err.formatted}"
      end

      @file = nil
      @file_size = 0
      @filename = nil
    end
  ensure
    @mutex.unlock if take_mutex
  end
  return threads
end

#create_unique_filename(ext = extension) ⇒ Object

implementation details



183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
# File 'lib/openc3/logs/log_writer.rb', line 183

def create_unique_filename(ext = extension)
  # Create a filename that doesn't exist
  attempt = nil
  while true
    filename_parts = [attempt]
    filename_parts.unshift @label if @label
    filename = File.join(Dir.tmpdir, File.build_timestamped_filename([@label, attempt], ext))
    if File.exist?(filename)
      attempt ||= 0
      attempt += 1
    else
      return filename
    end
  end
end

#cycle_thread_bodyObject



199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
# File 'lib/openc3/logs/log_writer.rb', line 199

def cycle_thread_body
  @@cycle_sleeper = Sleeper.new
  while true
    start_time = Time.now
    @@mutex.synchronize do
      @@instances.each do |instance|
        # The check against start_time needs to be mutex protected to prevent a packet coming in between the check
        # and closing the file
        instance.mutex.synchronize do
          utc_now = Time.now.utc
          # Logger.debug("start:#{@start_time.to_f} now:#{utc_now.to_f} cycle:#{@cycle_time} new:#{(utc_now - @start_time) > @cycle_time}")
          if instance.logging_enabled and instance.filename # Logging and file opened
            # Cycle based on total time logging
            if (instance.cycle_time and (utc_now - instance.start_time) > instance.cycle_time)
              Logger.debug("Log writer start new file due to cycle time")
              instance.close_file(false)
            # Cycle daily at a specific time
            elsif (instance.cycle_hour and instance.cycle_minute and utc_now.hour == instance.cycle_hour and utc_now.min == instance.cycle_minute and instance.start_time.yday != utc_now.yday)
              Logger.debug("Log writer start new file daily")
              instance.close_file(false)
            # Cycle hourly at a specific time
            elsif (instance.cycle_minute and not instance.cycle_hour and utc_now.min == instance.cycle_minute and instance.start_time.hour != utc_now.hour)
              Logger.debug("Log writer start new file hourly")
              instance.close_file(false)
            end
          end

          # Check for cleanup time
          indexes_to_clear = []
          instance.cleanup_times.each_with_index do |cleanup_time, index|
            if cleanup_time <= utc_now
              # Now that the file is in S3, trim the Redis stream up until the previous file.
              # This keeps one minute of data in Redis
              instance.cleanup_offsets[index].each do |redis_topic, cleanup_offset|
                Topic.trim_topic(redis_topic, cleanup_offset)
              end
              indexes_to_clear << index
            end
          end
          if indexes_to_clear.length > 0
            indexes_to_clear.each do |index|
              instance.cleanup_offsets[index] = nil
              instance.cleanup_times[index] = nil
            end
            instance.cleanup_offsets.compact!
            instance.cleanup_times.compact!
          end
        end
      end
    end

    # Only check whether to cycle at a set interval
    run_time = Time.now - start_time
    sleep_time = CYCLE_TIME_INTERVAL - run_time
    sleep_time = 0 if sleep_time < 0
    break if @@cycle_sleeper.sleep(sleep_time)
  end
end

#extensionObject



343
344
345
# File 'lib/openc3/logs/log_writer.rb', line 343

def extension
  '.log'.freeze
end

#first_timeObject



347
348
349
# File 'lib/openc3/logs/log_writer.rb', line 347

def first_time
  Time.from_nsec_from_epoch(@first_time)
end

#first_timestampObject



355
356
357
# File 'lib/openc3/logs/log_writer.rb', line 355

def first_timestamp
  first_time().to_timestamp # "YYYYMMDDHHmmSSNNNNNNNNN"
end

#graceful_killObject



177
178
179
# File 'lib/openc3/logs/log_writer.rb', line 177

def graceful_kill
  @cancel_threads = true
end

#last_timeObject



351
352
353
# File 'lib/openc3/logs/log_writer.rb', line 351

def last_time
  Time.from_nsec_from_epoch(@last_time)
end

#last_timestampObject



359
360
361
# File 'lib/openc3/logs/log_writer.rb', line 359

def last_timestamp
  last_time().to_timestamp # "YYYYMMDDHHmmSSNNNNNNNNN"
end

#prepare_write(time_nsec_since_epoch, data_length, redis_topic, redis_offset) ⇒ Object



281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
# File 'lib/openc3/logs/log_writer.rb', line 281

def prepare_write(time_nsec_since_epoch, data_length, redis_topic, redis_offset)
  # This check includes logging_enabled again because it might have changed since we acquired the mutex
  # Ensures new files based on size, and ensures always increasing time order in files
  if @logging_enabled
    if !@file
      Logger.debug("Log writer start new file because no file opened")
      start_new_file()
    elsif @cycle_size and ((@file_size + data_length) > @cycle_size)
      Logger.debug("Log writer start new file due to cycle size #{@cycle_size}")
      start_new_file()
    elsif @enforce_time_order and @previous_time_nsec_since_epoch and (@previous_time_nsec_since_epoch > time_nsec_since_epoch)
      # Warning: Creating new files here can cause lots of files to be created if packets make it through out of order
      # Changed to just a error to prevent file thrashing
      unless @out_of_order
        Logger.error("Log writer out of order time detected (increase buffer depth?): #{Time.from_nsec_from_epoch(@previous_time_nsec_since_epoch)} #{Time.from_nsec_from_epoch(time_nsec_since_epoch)}")
        @out_of_order = true
      end
    end
  end
  @last_offsets[redis_topic] = redis_offset if redis_topic and redis_offset # This is needed for the redis offset marker entry at the end of the log file
  @previous_time_nsec_since_epoch = time_nsec_since_epoch
end

#shutdownObject

Stop all logging, close the current log file, and kill the logging threads.



164
165
166
167
168
169
170
171
172
173
174
175
# File 'lib/openc3/logs/log_writer.rb', line 164

def shutdown
  threads = stop()
  @@mutex.synchronize do
    @@instances.delete(self)
    if @@instances.length <= 0
      @@cycle_sleeper.cancel if @@cycle_sleeper
      OpenC3.kill_thread(self, @@cycle_thread) if @@cycle_thread
      @@cycle_thread = nil
    end
  end
  return threads
end

#startObject

Starts a new log file by closing the existing log file. New log files are not created until packets are written by #write so this does not immediately create a log file on the filesystem.



152
153
154
# File 'lib/openc3/logs/log_writer.rb', line 152

def start
  @mutex.synchronize { close_file(false); @logging_enabled = true }
end

#start_new_fileObject

Starting a new log file is a critical operation so the entire method is wrapped with a rescue and handled with handle_critical_exception Assumes mutex has already been taken



261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
# File 'lib/openc3/logs/log_writer.rb', line 261

def start_new_file
  close_file(false)

  # Start log file
  @filename = create_unique_filename()
  @file = File.new(@filename, 'wb')
  @file_size = 0

  @start_time = Time.now.utc
  @out_of_order = false
  @first_time = nil
  @last_time = nil
  @previous_time_nsec_since_epoch = nil
  Logger.debug "Log File Opened : #{@filename}"
rescue => err
  Logger.error "Error starting new log file: #{err.formatted}"
  @logging_enabled = false
  OpenC3.handle_critical_exception(err)
end

#stopObject

Stops all logging and closes the current log file.



157
158
159
160
161
# File 'lib/openc3/logs/log_writer.rb', line 157

def stop
  threads = nil
  @mutex.synchronize { @logging_enabled = false; threads = close_file(false) }
  return threads
end