Class: Fourtrack::Recorder
- Inherits:
-
Object
- Object
- Fourtrack::Recorder
- Defined in:
- lib/fourtrack/recorder.rb
Overview
A class used as a destination for periodically writing out batch-written JSON formattable payloads. Can be used for stats logs, SQL replay logs and the like. Is thread safe and uses gzip compression. Writes are performed using a binary UNIX append, which with a small-ish record size should guarantee atomic append.
Constant Summary collapse
- NULL_LOGGER =
Logger.new(nil)
Instance Method Summary collapse
- #<<(payload) ⇒ Object
- #flush! ⇒ Object
-
#initialize(output_path:, flush_after:, logger: NULL_LOGGER) ⇒ Recorder
constructor
A new instance of Recorder.
- #pending? ⇒ Boolean
Constructor Details
#initialize(output_path:, flush_after:, logger: NULL_LOGGER) ⇒ Recorder
Returns a new instance of Recorder.
8 9 10 11 12 13 14 15 16 17 18 19 20 |
# File 'lib/fourtrack/recorder.rb', line 8 def initialize(output_path:, flush_after:, logger: NULL_LOGGER) @output_path = File.(output_path) @pid_at_create = Process.pid @logger = logger @buf = [] @mux = Mutex.new @flush_every = flush_after # Attempt to open the file for writing, # which will raise an exception outright if we do not have access File.open(@output_path, 'a') {} # and once we know we were able to open it, install an at_exit block for ourselves install_at_exit_hook! end |
Instance Method Details
#<<(payload) ⇒ Object
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
# File 'lib/fourtrack/recorder.rb', line 27 def <<(payload) # Get the current PID. mypid = Process.pid len_so_far = @mux.synchronize { # If the current PID doesn't match the one # that was set at instantiation, it means the process was # forked and we now possible also hold records for the parent # process, which we have to discard (it is the responsibility # of the parent to flush it's records, not ours!). if mypid != @pid_at_create @pid_at_create = mypid @buf.clear end @buf << payload @buf.length } flush! if len_so_far > @flush_every self end |
#flush! ⇒ Object
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 |
# File 'lib/fourtrack/recorder.rb', line 47 def flush! # Refuse to flush and empty the buffer if flush! is called # within a child and the object still has records pending mypid = Process.pid if mypid != @pid_at_create @logger.debug { "%s: Flush requested child PID %d, will inhibit flush and empty the record log first" % mypid } # Do not flush since we are in the child now @mux.synchronize { @buf.clear } return end io_buf = StringIO.new @mux.synchronize do @logger.debug { "%s: Compressing %d records from PID %d" % [self, @buf.length, Process.pid] } z = Zlib::GzipWriter.new(io_buf) @buf.each {|record| z.puts(record) } z.finish @buf.clear end @logger.debug { "%s: Flushing to %s, size before flush %d" % [self, @output_path, File.size(@output_path)] } File.open(@output_path, 'ab') { |f| f << io_buf.string } @logger.debug { "%s: After flush to %s size %d" % [self, @output_path, File.size(@output_path)] } io_buf.truncate(0) end |
#pending? ⇒ Boolean
22 23 24 25 |
# File 'lib/fourtrack/recorder.rb', line 22 def pending? len = @mux.synchronize { @buf.length } len.nonzero? end |