Class: Fourtrack::Recorder

Inherits:
Object
  • Object
show all
Defined in:
lib/fourtrack/recorder.rb

Overview

A class used as a destination for periodically writing out batch-written JSON formattable payloads. Can be used for stats logs, SQL replay logs and the like. Is thread safe and uses gzip compression. Writes are performed using a binary UNIX append, which with a small-ish record size should guarantee atomic append.

Constant Summary collapse

NULL_LOGGER =
Logger.new(nil)

Instance Method Summary collapse

Constructor Details

#initialize(output_path:, flush_after:, logger: NULL_LOGGER) ⇒ Recorder

Returns a new instance of Recorder.



8
9
10
11
12
13
14
15
16
17
18
19
20
# File 'lib/fourtrack/recorder.rb', line 8

def initialize(output_path:, flush_after:, logger: NULL_LOGGER)
  @output_path = File.expand_path(output_path)
  @pid_at_create = Process.pid
  @logger = logger
  @buf = []
  @mux = Mutex.new
  @flush_every = flush_after
  # Attempt to open the file for writing,
  # which will raise an exception outright if we do not have access
  File.open(@output_path, 'a') {}
  # and once we know we were able to open it, install an at_exit block for ourselves
  install_at_exit_hook!
end

Instance Method Details

#<<(payload) ⇒ Object



27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/fourtrack/recorder.rb', line 27

def <<(payload)
  # Get the current PID.
  mypid = Process.pid
  len_so_far  = @mux.synchronize {
    # If the current PID doesn't match the one
    # that was set at instantiation, it means the process was
    # forked and we now possible also hold records for the parent
    # process, which we have to discard (it is the responsibility
    # of the parent to flush it's records, not ours!).
    if mypid != @pid_at_create
      @pid_at_create = mypid
      @buf.clear
    end
    @buf << payload
    @buf.length
  }
  flush! if len_so_far > @flush_every
  self
end

#flush!Object



47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/fourtrack/recorder.rb', line 47

def flush!
  # Refuse to flush and empty the buffer if flush! is called
  # within a child and the object still has records pending
  mypid = Process.pid
  if mypid != @pid_at_create
    @logger.debug { "%s: Flush requested child PID %d, will inhibit flush and empty the record log first" % mypid }
    # Do not flush since we are in the child now
    @mux.synchronize { @buf.clear }
    return
  end

  io_buf = StringIO.new

  @mux.synchronize do
    @logger.debug { "%s: Compressing %d records from PID %d" % [self, @buf.length, Process.pid] }
    z = Zlib::GzipWriter.new(io_buf)
    @buf.each {|record| z.puts(record) }
    z.finish
    @buf.clear
  end

  @logger.debug { "%s: Flushing to %s, size before flush %d" % [self, @output_path, File.size(@output_path)] }
  File.open(@output_path, 'ab') { |f| f << io_buf.string }
  @logger.debug { "%s: After flush to %s size %d" % [self, @output_path, File.size(@output_path)] }

  io_buf.truncate(0)
end

#pending?Boolean

Returns:

  • (Boolean)


22
23
24
25
# File 'lib/fourtrack/recorder.rb', line 22

def pending?
  len = @mux.synchronize { @buf.length }
  len.nonzero?
end